From 2e9127314817bbb269bf2db52746a35f24f5d2c0 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 28 Mar 2026 01:48:20 +0000 Subject: [PATCH 1/3] Elixir bindings initial PR --- Cargo.toml | 2 +- bindings/elixir/.formatter.exs | 20 + bindings/elixir/.gitignore | 9 + bindings/elixir/README.md | 60 +++ bindings/elixir/lib/fluss.ex | 53 +++ bindings/elixir/lib/fluss/admin.ex | 98 +++++ bindings/elixir/lib/fluss/append_writer.ex | 71 +++ bindings/elixir/lib/fluss/config.ex | 58 +++ bindings/elixir/lib/fluss/connection.ex | 48 +++ bindings/elixir/lib/fluss/log_scanner.ex | 87 ++++ bindings/elixir/lib/fluss/native.ex | 88 ++++ bindings/elixir/lib/fluss/schema.ex | 87 ++++ bindings/elixir/lib/fluss/table.ex | 48 +++ bindings/elixir/lib/fluss/table_descriptor.ex | 54 +++ bindings/elixir/lib/fluss/write_handle.ex | 40 ++ bindings/elixir/mix.exs | 61 +++ bindings/elixir/mix.lock | 13 + bindings/elixir/native/fluss_nif/Cargo.toml | 32 ++ bindings/elixir/native/fluss_nif/src/admin.rs | 108 +++++ .../native/fluss_nif/src/append_writer.rs | 71 +++ bindings/elixir/native/fluss_nif/src/atoms.rs | 44 ++ .../elixir/native/fluss_nif/src/config.rs | 73 ++++ .../elixir/native/fluss_nif/src/connection.rs | 40 ++ bindings/elixir/native/fluss_nif/src/lib.rs | 46 ++ .../native/fluss_nif/src/log_scanner.rs | 170 ++++++++ .../native/fluss_nif/src/row_convert.rs | 256 +++++++++++ .../elixir/native/fluss_nif/src/schema.rs | 160 +++++++ bindings/elixir/native/fluss_nif/src/table.rs | 85 ++++ .../native/fluss_nif/src/write_handle.rs | 48 +++ bindings/elixir/test/fluss_test.exs | 112 +++++ .../test/integration/log_table_test.exs | 407 ++++++++++++++++++ bindings/elixir/test/support/cluster.ex | 211 +++++++++ bindings/elixir/test/test_helper.exs | 27 ++ 33 files changed, 2786 insertions(+), 1 deletion(-) create mode 100644 bindings/elixir/.formatter.exs create mode 100644 bindings/elixir/.gitignore create mode 100644 bindings/elixir/README.md create mode 100644 bindings/elixir/lib/fluss.ex create mode 100644 bindings/elixir/lib/fluss/admin.ex create mode 100644 bindings/elixir/lib/fluss/append_writer.ex create mode 100644 bindings/elixir/lib/fluss/config.ex create mode 100644 bindings/elixir/lib/fluss/connection.ex create mode 100644 bindings/elixir/lib/fluss/log_scanner.ex create mode 100644 bindings/elixir/lib/fluss/native.ex create mode 100644 bindings/elixir/lib/fluss/schema.ex create mode 100644 bindings/elixir/lib/fluss/table.ex create mode 100644 bindings/elixir/lib/fluss/table_descriptor.ex create mode 100644 bindings/elixir/lib/fluss/write_handle.ex create mode 100644 bindings/elixir/mix.exs create mode 100644 bindings/elixir/mix.lock create mode 100644 bindings/elixir/native/fluss_nif/Cargo.toml create mode 100644 bindings/elixir/native/fluss_nif/src/admin.rs create mode 100644 bindings/elixir/native/fluss_nif/src/append_writer.rs create mode 100644 bindings/elixir/native/fluss_nif/src/atoms.rs create mode 100644 bindings/elixir/native/fluss_nif/src/config.rs create mode 100644 bindings/elixir/native/fluss_nif/src/connection.rs create mode 100644 bindings/elixir/native/fluss_nif/src/lib.rs create mode 100644 bindings/elixir/native/fluss_nif/src/log_scanner.rs create mode 100644 bindings/elixir/native/fluss_nif/src/row_convert.rs create mode 100644 bindings/elixir/native/fluss_nif/src/schema.rs create mode 100644 bindings/elixir/native/fluss_nif/src/table.rs create mode 100644 bindings/elixir/native/fluss_nif/src/write_handle.rs create mode 100644 bindings/elixir/test/fluss_test.exs create mode 100644 bindings/elixir/test/integration/log_table_test.exs create mode 100644 bindings/elixir/test/support/cluster.ex create mode 100644 bindings/elixir/test/test_helper.exs diff --git a/Cargo.toml b/Cargo.toml index d4d262ad..1cfc4df7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ keywords = ["fluss", "streaming-storage", "datalake"] [workspace] resolver = "2" -members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"] +members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp", "bindings/elixir/native/fluss_nif"] [workspace.dependencies] fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", features = ["storage-all"] } diff --git a/bindings/elixir/.formatter.exs b/bindings/elixir/.formatter.exs new file mode 100644 index 00000000..dd63ff52 --- /dev/null +++ b/bindings/elixir/.formatter.exs @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/bindings/elixir/.gitignore b/bindings/elixir/.gitignore new file mode 100644 index 00000000..90277ffb --- /dev/null +++ b/bindings/elixir/.gitignore @@ -0,0 +1,9 @@ +# Elixir build artifacts +_build/ +deps/ + +# Generated NIF shared library +priv/native/ + +# Crash dumps +erl_crash.dump diff --git a/bindings/elixir/README.md b/bindings/elixir/README.md new file mode 100644 index 00000000..656b03c5 --- /dev/null +++ b/bindings/elixir/README.md @@ -0,0 +1,60 @@ +# Fluss Elixir Client + +Elixir client for [Apache Fluss (Incubating)](https://fluss.apache.org/), built on the official Rust client via [Rustler](https://github.com/rusterlium/rustler) NIFs. + +Currently supports **log tables** (append + scan). Primary key (KV) table support is planned. + +## Requirements + +- Elixir >= 1.15 +- Rust stable toolchain (for compiling the NIF) + +## Quick Start + +```elixir +config = Fluss.Config.new("localhost:9123") +conn = Fluss.Connection.new!(config) +admin = Fluss.Admin.new!(conn) + +schema = + Fluss.Schema.build() + |> Fluss.Schema.column("ts", :bigint) + |> Fluss.Schema.column("message", :string) + |> Fluss.Schema.build!() + +:ok = Fluss.Admin.create_table(admin, "my_db", "events", Fluss.TableDescriptor.new!(schema)) + +table = Fluss.Table.get!(conn, "my_db", "events") +writer = Fluss.AppendWriter.new!(table) +Fluss.AppendWriter.append(writer, [1_700_000_000, "hello"]) +:ok = Fluss.AppendWriter.flush(writer) + +scanner = Fluss.LogScanner.new!(table) +:ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) +:ok = Fluss.LogScanner.poll(scanner, 5_000) + +receive do + {:fluss_records, records} -> + for record <- records, do: IO.inspect(record[:row]) +end +``` + +## Data Types + +Simple: `:boolean`, `:tinyint`, `:smallint`, `:int`, `:bigint`, `:float`, `:double`, `:string`, `:bytes`, `:date`, `:time`, `:timestamp`, `:timestamp_ltz` + +Parameterized: `{:decimal, precision, scale}`, `{:char, length}`, `{:binary, length}` + +## Development + +```bash +cd bindings/elixir +mix test # unit tests +mix test --include integration # starts Docker cluster +``` + +Set `FLUSS_BOOTSTRAP_SERVERS` to use an existing cluster. + +## License + +Apache License 2.0 diff --git a/bindings/elixir/lib/fluss.ex b/bindings/elixir/lib/fluss.ex new file mode 100644 index 00000000..c6ceea36 --- /dev/null +++ b/bindings/elixir/lib/fluss.ex @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss do + @moduledoc """ + Elixir client for Apache Fluss (Incubating). + + ## Examples + + config = Fluss.Config.new("localhost:9123") + conn = Fluss.Connection.new!(config) + admin = Fluss.Admin.new!(conn) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("ts", :bigint) + |> Fluss.Schema.column("message", :string) + |> Fluss.Schema.build!() + + :ok = Fluss.Admin.create_table(admin, "my_db", "events", Fluss.TableDescriptor.new!(schema)) + + table = Fluss.Table.get!(conn, "my_db", "events") + writer = Fluss.AppendWriter.new!(table) + Fluss.AppendWriter.append(writer, [1_700_000_000, "hello"]) + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + :ok = Fluss.LogScanner.poll(scanner, 5_000) + receive do + {:fluss_records, records} -> records + end + + """ + + alias Fluss.Native + + def earliest_offset, do: Native.earliest_offset() +end diff --git a/bindings/elixir/lib/fluss/admin.ex b/bindings/elixir/lib/fluss/admin.ex new file mode 100644 index 00000000..dfc1720b --- /dev/null +++ b/bindings/elixir/lib/fluss/admin.ex @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Admin do + @moduledoc """ + Admin client for DDL operations (create/drop databases and tables). + + ## Examples + + admin = Fluss.Admin.new!(conn) + :ok = Fluss.Admin.create_database(admin, "my_db") + :ok = Fluss.Admin.create_table(admin, "my_db", "events", descriptor) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new(Fluss.Connection.t()) :: {:ok, t()} | {:error, String.t()} + def new(conn) do + case Native.admin_new(conn) do + {:error, _} = err -> err + admin -> {:ok, admin} + end + end + + @spec new!(Fluss.Connection.t()) :: t() + def new!(conn) do + case Native.admin_new(conn) do + {:error, reason} -> raise "failed to create admin: #{reason}" + admin -> admin + end + end + + @spec create_database(t(), String.t(), boolean()) :: :ok | {:error, String.t()} + def create_database(admin, name, ignore_if_exists \\ true), + do: Native.admin_create_database(admin, name, ignore_if_exists) + + @spec drop_database(t(), String.t(), boolean()) :: :ok | {:error, String.t()} + def drop_database(admin, name, ignore_if_not_exists \\ true), + do: Native.admin_drop_database(admin, name, ignore_if_not_exists) + + @spec list_databases(t()) :: {:ok, [String.t()]} | {:error, String.t()} + def list_databases(admin) do + case Native.admin_list_databases(admin) do + {:error, _} = err -> err + dbs -> {:ok, dbs} + end + end + + @spec list_databases!(t()) :: [String.t()] + def list_databases!(admin) do + case Native.admin_list_databases(admin) do + {:error, reason} -> raise "failed to list databases: #{reason}" + dbs -> dbs + end + end + + @spec create_table(t(), String.t(), String.t(), Fluss.TableDescriptor.t(), boolean()) :: + :ok | {:error, String.t()} + def create_table(admin, database, table, descriptor, ignore_if_exists \\ true), + do: Native.admin_create_table(admin, database, table, descriptor, ignore_if_exists) + + @spec drop_table(t(), String.t(), String.t(), boolean()) :: :ok | {:error, String.t()} + def drop_table(admin, database, table, ignore_if_not_exists \\ true), + do: Native.admin_drop_table(admin, database, table, ignore_if_not_exists) + + @spec list_tables(t(), String.t()) :: {:ok, [String.t()]} | {:error, String.t()} + def list_tables(admin, database) do + case Native.admin_list_tables(admin, database) do + {:error, _} = err -> err + tables -> {:ok, tables} + end + end + + @spec list_tables!(t(), String.t()) :: [String.t()] + def list_tables!(admin, database) do + case Native.admin_list_tables(admin, database) do + {:error, reason} -> raise "failed to list tables: #{reason}" + tables -> tables + end + end +end diff --git a/bindings/elixir/lib/fluss/append_writer.ex b/bindings/elixir/lib/fluss/append_writer.ex new file mode 100644 index 00000000..5e62b3f3 --- /dev/null +++ b/bindings/elixir/lib/fluss/append_writer.ex @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.AppendWriter do + @moduledoc """ + Writer for appending records to a log table. + + Values are passed as a list in column order. Use `nil` for null values. + `append/2` returns a `Fluss.WriteHandle` — drop it for fire-and-forget, + or call `Fluss.WriteHandle.wait/1` for per-record acknowledgment. + + ## Examples + + writer = Fluss.AppendWriter.new!(table) + + # Fire-and-forget + Fluss.AppendWriter.append(writer, [1_700_000_000, "hello"]) + Fluss.AppendWriter.append(writer, [1_700_000_001, "world"]) + :ok = Fluss.AppendWriter.flush(writer) + + # Per-record ack + {:ok, handle} = Fluss.AppendWriter.append(writer, [1_700_000_002, "critical"]) + :ok = Fluss.WriteHandle.wait(handle) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new(Fluss.Table.t()) :: {:ok, t()} | {:error, String.t()} + def new(table) do + case Native.append_writer_new(table) do + {:error, _} = err -> err + w -> {:ok, w} + end + end + + @spec new!(Fluss.Table.t()) :: t() + def new!(table) do + case Native.append_writer_new(table) do + {:error, reason} -> raise "failed to create append writer: #{reason}" + w -> w + end + end + + @spec append(t(), list()) :: {:ok, Fluss.WriteHandle.t()} | {:error, String.t()} + def append(writer, values) when is_list(values) do + case Native.append_writer_append(writer, values) do + {:error, _} = err -> err + handle -> {:ok, handle} + end + end + + @spec flush(t()) :: :ok | {:error, String.t()} + def flush(writer), do: Native.append_writer_flush(writer) +end diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex new file mode 100644 index 00000000..daeec1f6 --- /dev/null +++ b/bindings/elixir/lib/fluss/config.ex @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Config do + @moduledoc """ + Client configuration for connecting to a Fluss cluster. + + ## Examples + + config = Fluss.Config.new("localhost:9123") + + config = + Fluss.Config.default() + |> Fluss.Config.set_bootstrap_servers("host1:9123,host2:9123") + |> Fluss.Config.set_writer_batch_size(1_048_576) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new(String.t()) :: t() + def new(bootstrap_servers) when is_binary(bootstrap_servers) do + Native.config_new(bootstrap_servers) + end + + @spec default() :: t() + def default, do: Native.config_default() + + @spec set_bootstrap_servers(t(), String.t()) :: t() + def set_bootstrap_servers(config, servers), + do: Native.config_set_bootstrap_servers(config, servers) + + @spec set_writer_batch_size(t(), integer()) :: t() + def set_writer_batch_size(config, size), do: Native.config_set_writer_batch_size(config, size) + + @spec set_writer_batch_timeout_ms(t(), integer()) :: t() + def set_writer_batch_timeout_ms(config, ms), + do: Native.config_set_writer_batch_timeout_ms(config, ms) + + @spec get_bootstrap_servers(t()) :: String.t() + def get_bootstrap_servers(config), do: Native.config_get_bootstrap_servers(config) +end diff --git a/bindings/elixir/lib/fluss/connection.ex b/bindings/elixir/lib/fluss/connection.ex new file mode 100644 index 00000000..20fb6b59 --- /dev/null +++ b/bindings/elixir/lib/fluss/connection.ex @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Connection do + @moduledoc """ + A connection to a Fluss cluster. + + ## Examples + + config = Fluss.Config.new("localhost:9123") + {:ok, conn} = Fluss.Connection.new(config) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new(Fluss.Config.t()) :: {:ok, t()} | {:error, String.t()} + def new(config) do + case Native.connection_new(config) do + {:error, _} = err -> err + conn -> {:ok, conn} + end + end + + @spec new!(Fluss.Config.t()) :: t() + def new!(config) do + case Native.connection_new(config) do + {:error, reason} -> raise "failed to connect to Fluss: #{reason}" + conn -> conn + end + end +end diff --git a/bindings/elixir/lib/fluss/log_scanner.ex b/bindings/elixir/lib/fluss/log_scanner.ex new file mode 100644 index 00000000..f6c2d1be --- /dev/null +++ b/bindings/elixir/lib/fluss/log_scanner.ex @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.LogScanner do + @moduledoc """ + Scanner for reading records from a log table. + + `poll/2` is non-blocking — it returns `:ok` immediately and sends results + as `{:fluss_records, records}` or `{:fluss_poll_error, reason}` to the + calling process. No dirty scheduler threads are held during the wait. + + Each record is an atom-keyed map: `:offset`, `:timestamp`, `:change_type`, `:row`. + Row values are also atom-keyed (column names interned as atoms). + + ## Examples + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + :ok = Fluss.LogScanner.poll(scanner, 5_000) + + receive do + {:fluss_records, records} -> + for record <- records, do: IO.inspect(record[:row]) + {:fluss_poll_error, reason} -> + IO.puts("poll error: \#{reason}") + end + + """ + + alias Fluss.Native + + @type t :: reference() + @type record :: %{atom() => term()} + + @spec new(Fluss.Table.t()) :: {:ok, t()} | {:error, String.t()} + def new(table) do + case Native.log_scanner_new(table) do + {:error, _} = err -> err + s -> {:ok, s} + end + end + + @spec new!(Fluss.Table.t()) :: t() + def new!(table) do + case Native.log_scanner_new(table) do + {:error, reason} -> raise "failed to create log scanner: #{reason}" + s -> s + end + end + + @spec subscribe(t(), integer(), integer()) :: :ok | {:error, String.t()} + def subscribe(scanner, bucket, offset), + do: Native.log_scanner_subscribe(scanner, bucket, offset) + + @doc """ + Subscribes to multiple buckets. Takes a list of `{bucket_id, offset}` tuples. + """ + @spec subscribe_buckets(t(), [{integer(), integer()}]) :: :ok | {:error, String.t()} + def subscribe_buckets(scanner, bucket_offsets) when is_list(bucket_offsets), + do: Native.log_scanner_subscribe_buckets(scanner, bucket_offsets) + + @spec unsubscribe(t(), integer()) :: :ok | {:error, String.t()} + def unsubscribe(scanner, bucket), + do: Native.log_scanner_unsubscribe(scanner, bucket) + + @doc """ + Starts a non-blocking poll. Returns `:ok` immediately. + Results arrive as `{:fluss_records, [record]}` or `{:fluss_poll_error, reason}`. + """ + @spec poll(t(), non_neg_integer()) :: :ok + def poll(scanner, timeout_ms), + do: Native.log_scanner_poll(scanner, timeout_ms) +end diff --git a/bindings/elixir/lib/fluss/native.ex b/bindings/elixir/lib/fluss/native.ex new file mode 100644 index 00000000..a69636ed --- /dev/null +++ b/bindings/elixir/lib/fluss/native.ex @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Native do + @moduledoc false + use Rustler, otp_app: :fluss, crate: "fluss_nif" + + # Config + def config_new(_bootstrap_servers), do: :erlang.nif_error(:nif_not_loaded) + def config_default, do: :erlang.nif_error(:nif_not_loaded) + def config_set_bootstrap_servers(_config, _servers), do: :erlang.nif_error(:nif_not_loaded) + def config_set_writer_batch_size(_config, _size), do: :erlang.nif_error(:nif_not_loaded) + def config_set_writer_batch_timeout_ms(_config, _ms), do: :erlang.nif_error(:nif_not_loaded) + def config_get_bootstrap_servers(_config), do: :erlang.nif_error(:nif_not_loaded) + + # Connection + def connection_new(_config), do: :erlang.nif_error(:nif_not_loaded) + + # Admin + def admin_new(_conn), do: :erlang.nif_error(:nif_not_loaded) + + def admin_create_database(_admin, _name, _ignore_if_exists), + do: :erlang.nif_error(:nif_not_loaded) + + def admin_drop_database(_admin, _name, _ignore_if_not_exists), + do: :erlang.nif_error(:nif_not_loaded) + + def admin_list_databases(_admin), do: :erlang.nif_error(:nif_not_loaded) + + def admin_create_table(_admin, _db, _table, _descriptor, _ignore_if_exists), + do: :erlang.nif_error(:nif_not_loaded) + + def admin_drop_table(_admin, _db, _table, _ignore_if_not_exists), + do: :erlang.nif_error(:nif_not_loaded) + + def admin_list_tables(_admin, _database), do: :erlang.nif_error(:nif_not_loaded) + + # Schema + def schema_builder_new, do: :erlang.nif_error(:nif_not_loaded) + def schema_builder_column(_builder, _name, _data_type), do: :erlang.nif_error(:nif_not_loaded) + def schema_builder_primary_key(_builder, _keys), do: :erlang.nif_error(:nif_not_loaded) + def schema_builder_build(_builder), do: :erlang.nif_error(:nif_not_loaded) + def table_descriptor_new(_schema), do: :erlang.nif_error(:nif_not_loaded) + def table_descriptor_with_bucket_count(_schema, _count), do: :erlang.nif_error(:nif_not_loaded) + + def table_descriptor_with_properties(_schema, _properties), + do: :erlang.nif_error(:nif_not_loaded) + + # Table + def table_get(_conn, _database, _table), do: :erlang.nif_error(:nif_not_loaded) + def table_has_primary_key(_table), do: :erlang.nif_error(:nif_not_loaded) + def table_column_names(_table), do: :erlang.nif_error(:nif_not_loaded) + + # AppendWriter + def append_writer_new(_table), do: :erlang.nif_error(:nif_not_loaded) + def append_writer_append(_writer, _values), do: :erlang.nif_error(:nif_not_loaded) + def append_writer_flush(_writer), do: :erlang.nif_error(:nif_not_loaded) + + # LogScanner + def log_scanner_new(_table), do: :erlang.nif_error(:nif_not_loaded) + def log_scanner_subscribe(_scanner, _bucket, _offset), do: :erlang.nif_error(:nif_not_loaded) + + def log_scanner_subscribe_buckets(_scanner, _bucket_offsets), + do: :erlang.nif_error(:nif_not_loaded) + + def log_scanner_unsubscribe(_scanner, _bucket), do: :erlang.nif_error(:nif_not_loaded) + def log_scanner_poll(_scanner, _timeout_ms), do: :erlang.nif_error(:nif_not_loaded) + + # WriteHandle + def write_handle_wait(_handle), do: :erlang.nif_error(:nif_not_loaded) + + # Constants + def earliest_offset, do: :erlang.nif_error(:nif_not_loaded) +end diff --git a/bindings/elixir/lib/fluss/schema.ex b/bindings/elixir/lib/fluss/schema.ex new file mode 100644 index 00000000..566a9b0c --- /dev/null +++ b/bindings/elixir/lib/fluss/schema.ex @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Schema do + @moduledoc """ + Schema builder for defining table columns and primary keys. + + Simple types: `:boolean`, `:tinyint`, `:smallint`, `:int`, `:bigint`, + `:float`, `:double`, `:string`, `:bytes`, `:date`, `:time`, `:timestamp`, `:timestamp_ltz` + + Parameterized types: `{:decimal, precision, scale}`, `{:char, length}`, `{:binary, length}` + + ## Examples + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("name", :string) + |> Fluss.Schema.column("amount", {:decimal, 10, 2}) + |> Fluss.Schema.build!() + + """ + + alias Fluss.Native + + @type t :: reference() + @type builder :: reference() + + @type data_type :: + :boolean + | :tinyint + | :smallint + | :int + | :bigint + | :float + | :double + | :string + | :bytes + | :date + | :time + | :timestamp + | :timestamp_ltz + | {:decimal, non_neg_integer(), non_neg_integer()} + | {:char, non_neg_integer()} + | {:binary, non_neg_integer()} + + @spec build() :: builder() + def build, do: Native.schema_builder_new() + + @spec column(builder(), String.t(), data_type()) :: builder() + def column(builder, name, data_type) do + case Native.schema_builder_column(builder, name, data_type) do + {:error, reason} -> raise "failed to add column: #{reason}" + ref -> ref + end + end + + @spec primary_key(builder(), [String.t()]) :: builder() + def primary_key(builder, keys) do + case Native.schema_builder_primary_key(builder, keys) do + {:error, reason} -> raise "failed to set primary key: #{reason}" + ref -> ref + end + end + + @spec build!(builder()) :: t() + def build!(builder) do + case Native.schema_builder_build(builder) do + {:error, reason} -> raise "failed to build schema: #{reason}" + ref -> ref + end + end +end diff --git a/bindings/elixir/lib/fluss/table.ex b/bindings/elixir/lib/fluss/table.ex new file mode 100644 index 00000000..7d469244 --- /dev/null +++ b/bindings/elixir/lib/fluss/table.ex @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Table do + @moduledoc """ + A handle to a Fluss table, used to create writers and scanners. + """ + + alias Fluss.Native + + @type t :: reference() + + @spec get(Fluss.Connection.t(), String.t(), String.t()) :: {:ok, t()} | {:error, String.t()} + def get(conn, database, table) do + case Native.table_get(conn, database, table) do + {:error, _} = err -> err + t -> {:ok, t} + end + end + + @spec get!(Fluss.Connection.t(), String.t(), String.t()) :: t() + def get!(conn, database, table) do + case Native.table_get(conn, database, table) do + {:error, reason} -> raise "failed to get table: #{reason}" + t -> t + end + end + + @spec has_primary_key?(t()) :: boolean() + def has_primary_key?(table), do: Native.table_has_primary_key(table) + + @spec column_names(t()) :: [String.t()] + def column_names(table), do: Native.table_column_names(table) +end diff --git a/bindings/elixir/lib/fluss/table_descriptor.ex b/bindings/elixir/lib/fluss/table_descriptor.ex new file mode 100644 index 00000000..612d385e --- /dev/null +++ b/bindings/elixir/lib/fluss/table_descriptor.ex @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.TableDescriptor do + @moduledoc """ + Descriptor for creating a Fluss table. + + Options: `:bucket_count`, `:properties` (list of `{key, value}` string tuples). + + ## Examples + + Fluss.TableDescriptor.new!(schema) + Fluss.TableDescriptor.new!(schema, bucket_count: 3) + + """ + + alias Fluss.Native + + @type t :: reference() + + @spec new!(Fluss.Schema.t(), keyword()) :: t() + def new!(schema, opts \\ []) do + result = + cond do + Keyword.has_key?(opts, :bucket_count) -> + Native.table_descriptor_with_bucket_count(schema, opts[:bucket_count]) + + Keyword.has_key?(opts, :properties) -> + Native.table_descriptor_with_properties(schema, opts[:properties]) + + true -> + Native.table_descriptor_new(schema) + end + + case result do + {:error, reason} -> raise "failed to create table descriptor: #{reason}" + ref -> ref + end + end +end diff --git a/bindings/elixir/lib/fluss/write_handle.ex b/bindings/elixir/lib/fluss/write_handle.ex new file mode 100644 index 00000000..eb1e696b --- /dev/null +++ b/bindings/elixir/lib/fluss/write_handle.ex @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.WriteHandle do + @moduledoc """ + Handle for a pending write operation. + + Returned by `Fluss.AppendWriter.append/2`. Drop for fire-and-forget, + or call `wait/1` for per-record server acknowledgment. + """ + + alias Fluss.Native + + @type t :: reference() + + @spec wait(t()) :: :ok | {:error, String.t()} + def wait(handle), do: Native.write_handle_wait(handle) + + @spec wait!(t()) :: :ok + def wait!(handle) do + case wait(handle) do + :ok -> :ok + {:error, reason} -> raise "write failed: #{reason}" + end + end +end diff --git a/bindings/elixir/mix.exs b/bindings/elixir/mix.exs new file mode 100644 index 00000000..f5d416d8 --- /dev/null +++ b/bindings/elixir/mix.exs @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.MixProject do + use Mix.Project + + @version "0.1.0" + + def project do + [ + app: :fluss, + version: @version, + elixir: "~> 1.15", + start_permanent: Mix.env() == :prod, + elixirc_paths: elixirc_paths(Mix.env()), + deps: deps(), + description: "Elixir client for Apache Fluss", + package: package() + ] + end + + def application do + [ + extra_applications: [:logger] + ] + end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + + defp deps do + [ + {:rustler, "~> 0.37"}, + {:ex_doc, "~> 0.31", only: :dev, runtime: false}, + {:credo, "~> 1.7", only: [:dev, :test], runtime: false} + ] + end + + defp package do + [ + licenses: ["Apache-2.0"], + links: %{ + "GitHub" => "https://github.com/apache/fluss-rust" + } + ] + end +end diff --git a/bindings/elixir/mix.lock b/bindings/elixir/mix.lock new file mode 100644 index 00000000..b1170d3f --- /dev/null +++ b/bindings/elixir/mix.lock @@ -0,0 +1,13 @@ +%{ + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "credo": {:hex, :credo, "1.7.17", "f92b6aa5b26301eaa5a35e4d48ebf5aa1e7094ac00ae38f87086c562caf8a22f", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1eb5645c835f0b6c9b5410f94b5a185057bcf6d62a9c2b476da971cde8749645"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, + "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, + "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, + "rustler": {:hex, :rustler, "0.37.3", "5f4e6634d43b26f0a69834dd1d3ed4e1710b022a053bf4a670220c9540c92602", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "a6872c6f53dcf00486d1e7f9e046e20e01bf1654bdacc4193016c2e8002b32a2"}, +} diff --git a/bindings/elixir/native/fluss_nif/Cargo.toml b/bindings/elixir/native/fluss_nif/Cargo.toml new file mode 100644 index 00000000..90858606 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/Cargo.toml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "fluss_nif" +version = "0.1.0" +edition = "2024" + +[lib] +name = "fluss_nif" +path = "src/lib.rs" +crate-type = ["cdylib"] + +[dependencies] +fluss = { package = "fluss-rs", path = "../../../../crates/fluss" } +bigdecimal = "0.4" +rustler = "0.37" +tokio = { version = "1", features = ["full"] } diff --git a/bindings/elixir/native/fluss_nif/src/admin.rs b/bindings/elixir/native/fluss_nif/src/admin.rs new file mode 100644 index 00000000..984b4976 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/admin.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::connection::ConnectionResource; +use crate::schema::TableDescriptorResource; +use crate::RUNTIME; +use fluss::client::FlussAdmin; +use fluss::metadata::TablePath; +use rustler::{Atom, ResourceArc}; +use std::sync::Arc; + +pub struct AdminResource(pub Arc); + +impl std::panic::RefUnwindSafe for AdminResource {} + +#[rustler::resource_impl] +impl rustler::Resource for AdminResource {} + +#[rustler::nif] +fn admin_new(conn: ResourceArc) -> Result, rustler::Error> { + let admin = conn.0.get_admin().map_err(to_nif_err)?; + Ok(ResourceArc::new(AdminResource(admin))) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_create_database( + admin: ResourceArc, + database_name: String, + ignore_if_exists: bool, +) -> Result { + RUNTIME + .block_on(admin.0.create_database(&database_name, None, ignore_if_exists)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_drop_database( + admin: ResourceArc, + database_name: String, + ignore_if_not_exists: bool, +) -> Result { + RUNTIME + .block_on(admin.0.drop_database(&database_name, ignore_if_not_exists, false)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_list_databases(admin: ResourceArc) -> Result, rustler::Error> { + RUNTIME + .block_on(admin.0.list_databases()) + .map_err(to_nif_err) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_create_table( + admin: ResourceArc, + database_name: String, + table_name: String, + descriptor: ResourceArc, + ignore_if_exists: bool, +) -> Result { + let path = TablePath::new(&database_name, &table_name); + RUNTIME + .block_on(admin.0.create_table(&path, &descriptor.0, ignore_if_exists)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_drop_table( + admin: ResourceArc, + database_name: String, + table_name: String, + ignore_if_not_exists: bool, +) -> Result { + let path = TablePath::new(&database_name, &table_name); + RUNTIME + .block_on(admin.0.drop_table(&path, ignore_if_not_exists)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn admin_list_tables( + admin: ResourceArc, + database_name: String, +) -> Result, rustler::Error> { + RUNTIME + .block_on(admin.0.list_tables(&database_name)) + .map_err(to_nif_err) +} diff --git a/bindings/elixir/native/fluss_nif/src/append_writer.rs b/bindings/elixir/native/fluss_nif/src/append_writer.rs new file mode 100644 index 00000000..f60967ff --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/append_writer.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::row_convert; +use crate::table::TableResource; +use crate::write_handle::WriteHandleResource; +use crate::RUNTIME; +use fluss::client::AppendWriter; +use fluss::metadata::Column; +use rustler::{Atom, Env, ResourceArc, Term}; + +pub struct AppendWriterResource { + pub writer: AppendWriter, + pub columns: Vec, +} + +impl std::panic::RefUnwindSafe for AppendWriterResource {} + +#[rustler::resource_impl] +impl rustler::Resource for AppendWriterResource {} + +#[rustler::nif] +fn append_writer_new( + table: ResourceArc, +) -> Result, rustler::Error> { + // WriterClient::new() calls tokio::spawn internally. + let _guard = RUNTIME.enter(); + let (writer, columns) = table.with_table(|t| { + let writer = t + .new_append() + .map_err(to_nif_err)? + .create_writer() + .map_err(to_nif_err)?; + Ok((writer, t.get_table_info().schema.columns().to_vec())) + })?; + Ok(ResourceArc::new(AppendWriterResource { writer, columns })) +} + +#[rustler::nif] +fn append_writer_append<'a>( + env: Env<'a>, + writer: ResourceArc, + values: Term<'a>, +) -> Result, rustler::Error> { + let row = row_convert::term_to_row(env, values, &writer.columns).map_err(to_nif_err)?; + let future = writer.writer.append(&row).map_err(to_nif_err)?; + Ok(ResourceArc::new(WriteHandleResource::new(future))) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn append_writer_flush(writer: ResourceArc) -> Result { + RUNTIME + .block_on(writer.writer.flush()) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} diff --git a/bindings/elixir/native/fluss_nif/src/atoms.rs b/bindings/elixir/native/fluss_nif/src/atoms.rs new file mode 100644 index 00000000..11e7f771 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/atoms.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +rustler::atoms! { + ok, + nil, + + // Change types + append_only, + insert, + update_before, + update_after, + delete, + + // Poll result message tags + fluss_records, + fluss_poll_error, + + // Record map keys + offset, + timestamp, + change_type, + row, +} + +/// Convert any `Display` error into `rustler::Error::Term`, which the NIF +/// framework encodes as `{:error, reason_string}`. +pub fn to_nif_err(e: impl std::fmt::Display) -> rustler::Error { + rustler::Error::Term(Box::new(e.to_string())) +} diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs new file mode 100644 index 00000000..fde21ed4 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use fluss::config::Config; +use rustler::ResourceArc; + +pub struct ConfigResource(pub Config); + +impl std::panic::RefUnwindSafe for ConfigResource {} + +#[rustler::resource_impl] +impl rustler::Resource for ConfigResource {} + +#[rustler::nif] +fn config_new(bootstrap_servers: String) -> ResourceArc { + let mut config = Config::default(); + config.bootstrap_servers = bootstrap_servers; + ResourceArc::new(ConfigResource(config)) +} + +#[rustler::nif] +fn config_default() -> ResourceArc { + ResourceArc::new(ConfigResource(Config::default())) +} + +#[rustler::nif] +fn config_set_bootstrap_servers( + config: ResourceArc, + servers: String, +) -> ResourceArc { + let mut new_config = config.0.clone(); + new_config.bootstrap_servers = servers; + ResourceArc::new(ConfigResource(new_config)) +} + +#[rustler::nif] +fn config_set_writer_batch_size( + config: ResourceArc, + size: i32, +) -> ResourceArc { + let mut new_config = config.0.clone(); + new_config.writer_batch_size = size; + ResourceArc::new(ConfigResource(new_config)) +} + +#[rustler::nif] +fn config_set_writer_batch_timeout_ms( + config: ResourceArc, + timeout_ms: i64, +) -> ResourceArc { + let mut new_config = config.0.clone(); + new_config.writer_batch_timeout_ms = timeout_ms; + ResourceArc::new(ConfigResource(new_config)) +} + +#[rustler::nif] +fn config_get_bootstrap_servers(config: ResourceArc) -> String { + config.0.bootstrap_servers.clone() +} diff --git a/bindings/elixir/native/fluss_nif/src/connection.rs b/bindings/elixir/native/fluss_nif/src/connection.rs new file mode 100644 index 00000000..0c2db643 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/connection.rs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::to_nif_err; +use crate::config::ConfigResource; +use crate::RUNTIME; +use fluss::client::FlussConnection; +use rustler::ResourceArc; +use std::sync::Arc; + +pub struct ConnectionResource(pub Arc); + +impl std::panic::RefUnwindSafe for ConnectionResource {} + +#[rustler::resource_impl] +impl rustler::Resource for ConnectionResource {} + +#[rustler::nif(schedule = "DirtyIo")] +fn connection_new( + config: ResourceArc, +) -> Result, rustler::Error> { + let conn = RUNTIME + .block_on(FlussConnection::new(config.0.clone())) + .map_err(to_nif_err)?; + Ok(ResourceArc::new(ConnectionResource(Arc::new(conn)))) +} diff --git a/bindings/elixir/native/fluss_nif/src/lib.rs b/bindings/elixir/native/fluss_nif/src/lib.rs new file mode 100644 index 00000000..31ee3625 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/lib.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Rustler 0.37 wraps every NIF body in `std::panic::catch_unwind`, which requires +// all captured values (including `ResourceArc`) to be `RefUnwindSafe`. +// `ResourceArc` contains `*mut T`, so it is only `RefUnwindSafe` when `T` is. +// Our resource types contain `parking_lot` locks (`UnsafeCell`) which opt out of +// the auto-trait. We manually impl `RefUnwindSafe` on each resource type because +// panic safety is already guaranteed by the NIF boundary — a panic is caught and +// converted to an Erlang exception, never observed by Rust code. + +mod admin; +mod append_writer; +mod atoms; +mod config; +mod connection; +mod log_scanner; +mod row_convert; +mod schema; +mod table; +mod write_handle; + +use std::sync::LazyLock; + +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to create tokio runtime") +}); + +rustler::init!("Elixir.Fluss.Native"); diff --git a/bindings/elixir/native/fluss_nif/src/log_scanner.rs b/bindings/elixir/native/fluss_nif/src/log_scanner.rs new file mode 100644 index 00000000..c6021d46 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/log_scanner.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::row_convert; +use crate::table::TableResource; +use crate::RUNTIME; +use fluss::client::LogScanner; +use fluss::metadata::Column; +use fluss::record::ChangeType; +use rustler::env::OwnedEnv; +use rustler::types::LocalPid; +use rustler::{Atom, Encoder, Env, ResourceArc}; +use std::collections::HashMap; +use std::time::Duration; + +pub struct LogScannerResource { + pub scanner: LogScanner, + pub columns: Vec, +} + +impl std::panic::RefUnwindSafe for LogScannerResource {} + +#[rustler::resource_impl] +impl rustler::Resource for LogScannerResource {} + +#[rustler::nif] +fn log_scanner_new( + table: ResourceArc, +) -> Result, rustler::Error> { + let _guard = RUNTIME.enter(); + let (scanner, columns) = table.with_table(|t| { + let scanner = t + .new_scan() + .create_log_scanner() + .map_err(to_nif_err)?; + Ok((scanner, t.get_table_info().schema.columns().to_vec())) + })?; + Ok(ResourceArc::new(LogScannerResource { scanner, columns })) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe( + scanner: ResourceArc, + bucket: i32, + offset: i64, +) -> Result { + RUNTIME + .block_on(scanner.scanner.subscribe(bucket, offset)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_subscribe_buckets( + scanner: ResourceArc, + bucket_offsets: Vec<(i32, i64)>, +) -> Result { + let map: HashMap = bucket_offsets.into_iter().collect(); + RUNTIME + .block_on(scanner.scanner.subscribe_buckets(&map)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif(schedule = "DirtyIo")] +fn log_scanner_unsubscribe( + scanner: ResourceArc, + bucket: i32, +) -> Result { + RUNTIME + .block_on(scanner.scanner.unsubscribe(bucket)) + .map_err(to_nif_err)?; + Ok(atoms::ok()) +} + +#[rustler::nif] +fn log_scanner_poll( + env: Env, + scanner: ResourceArc, + timeout_ms: u64, +) -> Atom { + let pid = env.pid(); + let scanner = scanner.clone(); + + std::thread::spawn(move || { + let result = RUNTIME.block_on(scanner.scanner.poll(Duration::from_millis(timeout_ms))); + send_poll_result(&pid, result, &scanner.columns); + }); + + atoms::ok() +} + +fn send_poll_result( + pid: &LocalPid, + result: Result, + columns: &[Column], +) { + let mut msg_env = OwnedEnv::new(); + + match result { + Ok(scan_records) => { + let _ = msg_env.send_and_clear(pid, |env| { + let records = encode_scan_records(env, scan_records, columns); + (atoms::fluss_records(), records).encode(env) + }); + } + Err(e) => { + let _ = msg_env.send_and_clear(pid, |env| { + (atoms::fluss_poll_error(), e.to_string()).encode(env) + }); + } + } +} + +fn encode_scan_records<'a>( + env: Env<'a>, + scan_records: fluss::record::ScanRecords, + columns: &[Column], +) -> rustler::Term<'a> { + let column_atoms = row_convert::intern_column_atoms(env, columns); + let mut result = Vec::new(); + + for record in scan_records { + let row_map = match row_convert::row_to_term(env, record.row(), columns, &column_atoms) { + Ok(m) => m, + Err(_) => continue, + }; + let change_type_atom = match record.change_type() { + ChangeType::AppendOnly => atoms::append_only().encode(env), + ChangeType::Insert => atoms::insert().encode(env), + ChangeType::UpdateBefore => atoms::update_before().encode(env), + ChangeType::UpdateAfter => atoms::update_after().encode(env), + ChangeType::Delete => atoms::delete().encode(env), + }; + + if let Ok(record_map) = rustler::Term::map_from_pairs( + env, + &[ + (atoms::offset().encode(env), record.offset().encode(env)), + (atoms::timestamp().encode(env), record.timestamp().encode(env)), + (atoms::change_type().encode(env), change_type_atom), + (atoms::row().encode(env), row_map), + ], + ) { + result.push(record_map); + } + } + + result.encode(env) +} + +#[rustler::nif] +fn earliest_offset() -> i64 { + fluss::client::EARLIEST_OFFSET +} diff --git a/bindings/elixir/native/fluss_nif/src/row_convert.rs b/bindings/elixir/native/fluss_nif/src/row_convert.rs new file mode 100644 index 00000000..3fcdcad2 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/row_convert.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::str::FromStr; + +use fluss::metadata::{Column, DataType}; +use fluss::row::{Date, Decimal, GenericRow, InternalRow, Time, TimestampLtz, TimestampNtz}; +use rustler::types::binary::NewBinary; +use rustler::{Encoder, Env, Term}; + +use crate::atoms; + +pub fn intern_column_atoms<'a>(env: Env<'a>, columns: &[Column]) -> Vec { + columns + .iter() + .map(|col| rustler::Atom::from_str(env, col.name()).expect("valid atom")) + .collect() +} + +pub fn row_to_term<'a>( + env: Env<'a>, + row: &dyn InternalRow, + columns: &[Column], + column_atoms: &[rustler::Atom], +) -> Result, String> { + let pairs: Vec<(Term<'a>, Term<'a>)> = columns + .iter() + .enumerate() + .map(|(i, col)| { + let key = column_atoms[i].encode(env); + let value = field_to_term(env, row, i, col.data_type())?; + Ok((key, value)) + }) + .collect::>()?; + Term::map_from_pairs(env, &pairs).map_err(|_| "failed to create map".to_string()) +} + +fn field_to_term<'a>( + env: Env<'a>, + row: &dyn InternalRow, + pos: usize, + data_type: &DataType, +) -> Result, String> { + if row.is_null_at(pos).map_err(|e| e.to_string())? { + return Ok(atoms::nil().encode(env)); + } + + match data_type { + DataType::Boolean(_) => { + let v = row.get_boolean(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::TinyInt(_) => { + let v = row.get_byte(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::SmallInt(_) => { + let v = row.get_short(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::Int(_) => { + let v = row.get_int(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::BigInt(_) => { + let v = row.get_long(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::Float(_) => { + let v = row.get_float(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::Double(_) => { + let v = row.get_double(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::String(_) => { + let v = row.get_string(pos).map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::Char(ct) => { + let v = row + .get_char(pos, ct.length() as usize) + .map_err(|e| e.to_string())?; + Ok(v.encode(env)) + } + DataType::Bytes(_) => { + let v = row.get_bytes(pos).map_err(|e| e.to_string())?; + let mut bin = NewBinary::new(env, v.len()); + bin.as_mut_slice().copy_from_slice(v); + let binary: rustler::Binary = bin.into(); + Ok(binary.encode(env)) + } + DataType::Binary(bt) => { + let v = row + .get_binary(pos, bt.length()) + .map_err(|e| e.to_string())?; + let mut bin = NewBinary::new(env, v.len()); + bin.as_mut_slice().copy_from_slice(v); + let binary: rustler::Binary = bin.into(); + Ok(binary.encode(env)) + } + DataType::Date(_) => { + let v = row.get_date(pos).map_err(|e| e.to_string())?; + Ok(v.get_inner().encode(env)) + } + DataType::Time(_) => { + let v = row.get_time(pos).map_err(|e| e.to_string())?; + Ok(v.get_inner().encode(env)) + } + DataType::Timestamp(ts) => { + let v = row + .get_timestamp_ntz(pos, ts.precision()) + .map_err(|e| e.to_string())?; + Ok((v.get_millisecond(), v.get_nano_of_millisecond()).encode(env)) + } + DataType::TimestampLTz(ts) => { + let v = row + .get_timestamp_ltz(pos, ts.precision()) + .map_err(|e| e.to_string())?; + Ok((v.get_epoch_millisecond(), v.get_nano_of_millisecond()).encode(env)) + } + DataType::Decimal(dt) => { + let v = row + .get_decimal(pos, dt.precision() as usize, dt.scale() as usize) + .map_err(|e| e.to_string())?; + Ok(v.to_string().encode(env)) + } + _ => Err(format!("unsupported data type: {data_type:?}")), + } +} + +pub fn term_to_row<'a>( + env: Env<'a>, + values: Term<'a>, + columns: &[Column], +) -> Result, String> { + let list: Vec> = values + .decode() + .map_err(|_| "expected a list of values".to_string())?; + if list.len() != columns.len() { + return Err(format!( + "expected {} values, got {}", + columns.len(), + list.len() + )); + } + + let mut row = GenericRow::new(columns.len()); + for (i, (term, col)) in list.iter().zip(columns.iter()).enumerate() { + if term.is_atom() { + if let Ok(atom) = term.decode::() { + if atom == atoms::nil() { + continue; // leave as null + } + } + } + set_field_from_term(env, &mut row, i, *term, col.data_type())?; + } + Ok(row) +} + +fn set_field_from_term<'a>( + _env: Env<'a>, + row: &mut GenericRow<'static>, + pos: usize, + term: Term<'a>, + data_type: &DataType, +) -> Result<(), String> { + match data_type { + DataType::Boolean(_) => { + let v: bool = term.decode().map_err(|_| "expected boolean")?; + row.set_field(pos, v); + } + DataType::TinyInt(_) => { + let v: i8 = term.decode().map_err(|_| "expected integer for tinyint")?; + row.set_field(pos, v); + } + DataType::SmallInt(_) => { + let v: i16 = term + .decode() + .map_err(|_| "expected integer for smallint")?; + row.set_field(pos, v); + } + DataType::Int(_) => { + let v: i32 = term.decode().map_err(|_| "expected integer")?; + row.set_field(pos, v); + } + DataType::BigInt(_) => { + let v: i64 = term.decode().map_err(|_| "expected integer")?; + row.set_field(pos, v); + } + DataType::Date(_) => { + let v: i32 = term.decode().map_err(|_| "expected integer (days since epoch)")?; + row.set_field(pos, Date::new(v)); + } + DataType::Time(_) => { + let v: i32 = term.decode().map_err(|_| "expected integer (millis since midnight)")?; + row.set_field(pos, Time::new(v)); + } + DataType::Timestamp(_) => { + let (millis, nanos): (i64, i32) = term + .decode() + .map_err(|_| "expected {millis, nanos} tuple for timestamp")?; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).map_err(|e| e.to_string())?; + row.set_field(pos, ts); + } + DataType::TimestampLTz(_) => { + let (millis, nanos): (i64, i32) = term + .decode() + .map_err(|_| "expected {millis, nanos} tuple for timestamp_ltz")?; + let ts = TimestampLtz::from_millis_nanos(millis, nanos).map_err(|e| e.to_string())?; + row.set_field(pos, ts); + } + DataType::Float(_) => { + let v: f64 = term.decode().map_err(|_| "expected number for float")?; + row.set_field(pos, v as f32); + } + DataType::Double(_) => { + let v: f64 = term.decode().map_err(|_| "expected number for double")?; + row.set_field(pos, v); + } + DataType::String(_) | DataType::Char(_) => { + let v: String = term.decode().map_err(|_| "expected string")?; + row.set_field(pos, v); + } + DataType::Decimal(dt) => { + let v: String = term.decode().map_err(|_| "expected string for decimal")?; + let bd = bigdecimal::BigDecimal::from_str(&v) + .map_err(|e| format!("failed to parse decimal '{v}': {e}"))?; + let decimal = Decimal::from_big_decimal(bd, dt.precision(), dt.scale()) + .map_err(|e| e.to_string())?; + row.set_field(pos, decimal); + } + DataType::Bytes(_) | DataType::Binary(_) => { + let bin: rustler::Binary = term.decode().map_err(|_| "expected binary")?; + row.set_field(pos, bin.as_slice().to_vec()); + } + _ => return Err(format!("unsupported data type for writing: {data_type:?}")), + } + Ok(()) +} diff --git a/bindings/elixir/native/fluss_nif/src/schema.rs b/bindings/elixir/native/fluss_nif/src/schema.rs new file mode 100644 index 00000000..11f5287c --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/schema.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::to_nif_err; +use fluss::metadata::{DataTypes, Schema, SchemaBuilder, TableDescriptor}; +use rustler::{NifTaggedEnum, ResourceArc}; +use std::sync::Mutex; + +pub struct SchemaBuilderResource(pub Mutex>); +pub struct SchemaResource(pub Schema); +pub struct TableDescriptorResource(pub TableDescriptor); + +impl std::panic::RefUnwindSafe for SchemaBuilderResource {} +impl std::panic::RefUnwindSafe for SchemaResource {} +impl std::panic::RefUnwindSafe for TableDescriptorResource {} + +#[rustler::resource_impl] +impl rustler::Resource for SchemaBuilderResource {} + +#[rustler::resource_impl] +impl rustler::Resource for SchemaResource {} + +#[rustler::resource_impl] +impl rustler::Resource for TableDescriptorResource {} + +/// Fluss data type for NIF interop. +/// +/// Simple types map to atoms: `:int`, `:string`, etc. +/// Parameterized types map to tuples: `{:decimal, 10, 2}`, `{:char, 20}`. +#[derive(NifTaggedEnum)] +pub enum DataType { + Boolean, + Tinyint, + Smallint, + Int, + Bigint, + Float, + Double, + String, + Bytes, + Date, + Time, + Timestamp, + TimestampLtz, + Decimal(u32, u32), + Char(u32), + Binary(usize), +} + +fn to_fluss_type(dt: &DataType) -> fluss::metadata::DataType { + match dt { + DataType::Boolean => DataTypes::boolean(), + DataType::Tinyint => DataTypes::tinyint(), + DataType::Smallint => DataTypes::smallint(), + DataType::Int => DataTypes::int(), + DataType::Bigint => DataTypes::bigint(), + DataType::Float => DataTypes::float(), + DataType::Double => DataTypes::double(), + DataType::String => DataTypes::string(), + DataType::Bytes => DataTypes::bytes(), + DataType::Date => DataTypes::date(), + DataType::Time => DataTypes::time(), + DataType::Timestamp => DataTypes::timestamp(), + DataType::TimestampLtz => DataTypes::timestamp_ltz(), + DataType::Decimal(precision, scale) => DataTypes::decimal(*precision, *scale), + DataType::Char(length) => DataTypes::char(*length), + DataType::Binary(length) => DataTypes::binary(*length), + } +} + +#[rustler::nif] +fn schema_builder_new() -> ResourceArc { + ResourceArc::new(SchemaBuilderResource(Mutex::new(Some(Schema::builder())))) +} + +#[rustler::nif] +fn schema_builder_column( + builder: ResourceArc, + name: String, + data_type: DataType, +) -> Result, rustler::Error> { + let mut guard = builder.0.lock().unwrap(); + let b = guard.take().ok_or_else(|| to_nif_err("schema builder already consumed"))?; + *guard = Some(b.column(&name, to_fluss_type(&data_type))); + drop(guard); + Ok(builder) +} + +#[rustler::nif] +fn schema_builder_primary_key( + builder: ResourceArc, + keys: Vec, +) -> Result, rustler::Error> { + let mut guard = builder.0.lock().unwrap(); + let b = guard.take().ok_or_else(|| to_nif_err("schema builder already consumed"))?; + *guard = Some(b.primary_key(keys)); + drop(guard); + Ok(builder) +} + +#[rustler::nif] +fn schema_builder_build( + builder: ResourceArc, +) -> Result, rustler::Error> { + let mut guard = builder.0.lock().unwrap(); + let b = guard.take().ok_or_else(|| to_nif_err("schema builder already consumed"))?; + let schema = b.build().map_err(to_nif_err)?; + Ok(ResourceArc::new(SchemaResource(schema))) +} + +#[rustler::nif] +fn table_descriptor_new( + schema: ResourceArc, +) -> Result, rustler::Error> { + let descriptor = TableDescriptor::builder() + .schema(schema.0.clone()) + .build() + .map_err(to_nif_err)?; + Ok(ResourceArc::new(TableDescriptorResource(descriptor))) +} + +#[rustler::nif] +fn table_descriptor_with_bucket_count( + schema: ResourceArc, + bucket_count: i32, +) -> Result, rustler::Error> { + let descriptor = TableDescriptor::builder() + .schema(schema.0.clone()) + .distributed_by(Some(bucket_count), vec![]) + .build() + .map_err(to_nif_err)?; + Ok(ResourceArc::new(TableDescriptorResource(descriptor))) +} + +#[rustler::nif] +fn table_descriptor_with_properties( + schema: ResourceArc, + properties: Vec<(String, String)>, +) -> Result, rustler::Error> { + let mut builder = TableDescriptor::builder().schema(schema.0.clone()); + for (key, value) in properties { + builder = builder.property(&key, &value); + } + let descriptor = builder.build().map_err(to_nif_err)?; + Ok(ResourceArc::new(TableDescriptorResource(descriptor))) +} diff --git a/bindings/elixir/native/fluss_nif/src/table.rs b/bindings/elixir/native/fluss_nif/src/table.rs new file mode 100644 index 00000000..621f2f96 --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/table.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::to_nif_err; +use crate::connection::ConnectionResource; +use crate::RUNTIME; +use fluss::client::{FlussConnection, FlussTable, Metadata}; +use fluss::metadata::{Column, TableInfo, TablePath}; +use rustler::ResourceArc; +use std::sync::Arc; + +/// Holds the data needed to reconstruct FlussTable (which has a lifetime +/// tied to FlussConnection). We store the Arc to keep +/// it alive and reconstruct short-lived FlussTable instances on demand. +pub struct TableResource { + pub connection: Arc, + pub metadata: Arc, + pub table_info: TableInfo, +} + +impl std::panic::RefUnwindSafe for TableResource {} + +#[rustler::resource_impl] +impl rustler::Resource for TableResource {} + +impl TableResource { + pub fn columns(&self) -> &[Column] { + self.table_info.schema.columns() + } + + pub fn with_table(&self, f: impl FnOnce(&FlussTable<'_>) -> T) -> T { + let table = FlussTable::new( + &self.connection, + self.metadata.clone(), + self.table_info.clone(), + ); + f(&table) + } +} + +#[rustler::nif(schedule = "DirtyIo")] +fn table_get( + conn: ResourceArc, + database_name: String, + table_name: String, +) -> Result, rustler::Error> { + let path = TablePath::new(&database_name, &table_name); + let table = RUNTIME + .block_on(conn.0.get_table(&path)) + .map_err(to_nif_err)?; + + Ok(ResourceArc::new(TableResource { + connection: conn.0.clone(), + metadata: table.metadata().clone(), + table_info: table.get_table_info().clone(), + })) +} + +#[rustler::nif] +fn table_has_primary_key(table: ResourceArc) -> bool { + table.table_info.has_primary_key() +} + +#[rustler::nif] +fn table_column_names(table: ResourceArc) -> Vec { + table + .columns() + .iter() + .map(|c| c.name().to_string()) + .collect() +} diff --git a/bindings/elixir/native/fluss_nif/src/write_handle.rs b/bindings/elixir/native/fluss_nif/src/write_handle.rs new file mode 100644 index 00000000..72ead9fa --- /dev/null +++ b/bindings/elixir/native/fluss_nif/src/write_handle.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::atoms::{self, to_nif_err}; +use crate::RUNTIME; +use fluss::client::WriteResultFuture; +use rustler::{Atom, ResourceArc}; +use std::sync::Mutex; + +pub struct WriteHandleResource(Mutex>); + +impl std::panic::RefUnwindSafe for WriteHandleResource {} + +#[rustler::resource_impl] +impl rustler::Resource for WriteHandleResource {} + +impl WriteHandleResource { + pub fn new(future: WriteResultFuture) -> Self { + Self(Mutex::new(Some(future))) + } +} + +#[rustler::nif(schedule = "DirtyIo")] +fn write_handle_wait(handle: ResourceArc) -> Result { + let future = handle + .0 + .lock() + .map_err(|e| to_nif_err(format!("lock poisoned: {e}")))? + .take() + .ok_or_else(|| to_nif_err("WriteHandle already consumed"))?; + + RUNTIME.block_on(future).map_err(to_nif_err)?; + Ok(atoms::ok()) +} diff --git a/bindings/elixir/test/fluss_test.exs b/bindings/elixir/test/fluss_test.exs new file mode 100644 index 00000000..2304cb06 --- /dev/null +++ b/bindings/elixir/test/fluss_test.exs @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule FlussTest do + use ExUnit.Case + + describe "Config" do + test "creates config with bootstrap servers" do + config = Fluss.Config.new("localhost:9123") + assert Fluss.Config.get_bootstrap_servers(config) == "localhost:9123" + end + + test "default config uses localhost" do + config = Fluss.Config.default() + assert Fluss.Config.get_bootstrap_servers(config) == "127.0.0.1:9123" + end + + test "config chaining" do + config = + Fluss.Config.default() + |> Fluss.Config.set_bootstrap_servers("host1:9123,host2:9123") + + assert Fluss.Config.get_bootstrap_servers(config) == "host1:9123,host2:9123" + end + end + + describe "Schema" do + test "builds a simple log table schema" do + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("ts", :bigint) + |> Fluss.Schema.column("message", :string) + |> Fluss.Schema.build!() + + assert is_reference(schema) + end + + test "builds a schema with all simple types" do + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("a", :boolean) + |> Fluss.Schema.column("b", :tinyint) + |> Fluss.Schema.column("c", :smallint) + |> Fluss.Schema.column("d", :int) + |> Fluss.Schema.column("e", :bigint) + |> Fluss.Schema.column("f", :float) + |> Fluss.Schema.column("g", :double) + |> Fluss.Schema.column("h", :string) + |> Fluss.Schema.column("i", :bytes) + |> Fluss.Schema.column("j", :date) + |> Fluss.Schema.column("k", :time) + |> Fluss.Schema.column("l", :timestamp) + |> Fluss.Schema.column("m", :timestamp_ltz) + |> Fluss.Schema.build!() + + assert is_reference(schema) + end + + test "builds a schema with parameterized types" do + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("amount", {:decimal, 10, 2}) + |> Fluss.Schema.column("code", {:char, 5}) + |> Fluss.Schema.column("data", {:binary, 16}) + |> Fluss.Schema.build!() + + assert is_reference(schema) + end + end + + describe "TableDescriptor" do + test "creates descriptor from schema" do + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + assert is_reference(descriptor) + end + + test "creates descriptor with bucket count" do + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema, bucket_count: 3) + assert is_reference(descriptor) + end + end + + describe "earliest_offset/0" do + test "returns -2" do + assert Fluss.earliest_offset() == -2 + end + end +end diff --git a/bindings/elixir/test/integration/log_table_test.exs b/bindings/elixir/test/integration/log_table_test.exs new file mode 100644 index 00000000..6dec7ed4 --- /dev/null +++ b/bindings/elixir/test/integration/log_table_test.exs @@ -0,0 +1,407 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Integration.LogTableTest do + use ExUnit.Case, async: false + + alias Fluss.Test.Cluster + + @moduletag :integration + + @database "fluss" + + setup_all do + case Cluster.ensure_started() do + {:ok, servers} -> + config = Fluss.Config.new(servers) + + # Wait for cluster to be fully ready (connection + admin working) + {conn, admin} = connect_with_retry(config, 90) + + %{conn: conn, admin: admin, config: config} + + {:error, reason} -> + raise "Failed to start Fluss cluster: #{reason}" + end + end + + describe "append and scan" do + test "append rows and scan with log scanner", %{conn: conn, admin: admin} do + table_name = "ex_test_append_and_scan_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("c1", :int) + |> Fluss.Schema.column("c2", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + # Append 6 rows + for {c1, c2} <- [{1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}] do + {:ok, _} = Fluss.AppendWriter.append(writer, [c1, c2]) + end + + :ok = Fluss.AppendWriter.flush(writer) + + # Scan all records + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 6) + + assert length(records) == 6 + + sorted = Enum.sort_by(records, fn r -> r[:row][:c1] end) + + for {record, i} <- Enum.with_index(sorted, 1) do + assert record[:row][:c1] == i + assert record[:row][:c2] == "a#{i}" + assert record[:change_type] == :append_only + end + + # Unsubscribe should not error + :ok = Fluss.LogScanner.unsubscribe(scanner, 0) + + cleanup_table(admin, table_name) + end + + test "append with nil values", %{conn: conn, admin: admin} do + table_name = "ex_test_append_nil_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("name", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + {:ok, _} = Fluss.AppendWriter.append(writer, [1, nil]) + {:ok, _} = Fluss.AppendWriter.append(writer, [2, "present"]) + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 2) + assert length(records) == 2 + + sorted = Enum.sort_by(records, fn r -> r[:row][:id] end) + assert Enum.at(sorted, 0)[:row][:name] == nil + assert Enum.at(sorted, 1)[:row][:name] == "present" + + cleanup_table(admin, table_name) + end + end + + describe "multiple data types" do + test "int, bigint, float, double, string, boolean", %{conn: conn, admin: admin} do + table_name = "ex_test_data_types_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("a_int", :int) + |> Fluss.Schema.column("b_bigint", :bigint) + |> Fluss.Schema.column("c_float", :float) + |> Fluss.Schema.column("d_double", :double) + |> Fluss.Schema.column("e_string", :string) + |> Fluss.Schema.column("f_bool", :boolean) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + {:ok, _} = + Fluss.AppendWriter.append(writer, [ + 42, + 1_000_000_000_000, + 3.14, + 2.718281828, + "hello", + true + ]) + + {:ok, _} = Fluss.AppendWriter.append(writer, [-1, -999, 0.0, -1.5, "", false]) + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 2) + assert length(records) == 2 + + sorted = Enum.sort_by(records, fn r -> r[:row][:a_int] end) + row1 = Enum.at(sorted, 0)[:row] + row2 = Enum.at(sorted, 1)[:row] + + assert row1[:a_int] == -1 + assert row1[:b_bigint] == -999 + assert row1[:e_string] == "" + assert row1[:f_bool] == false + + assert row2[:a_int] == 42 + assert row2[:b_bigint] == 1_000_000_000_000 + assert row2[:e_string] == "hello" + assert row2[:f_bool] == true + + cleanup_table(admin, table_name) + end + end + + describe "subscribe_buckets" do + test "subscribe to multiple buckets at once", %{conn: conn, admin: admin} do + table_name = "ex_test_subscribe_buckets_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("val", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema, bucket_count: 3) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + for i <- 1..9 do + {:ok, _} = Fluss.AppendWriter.append(writer, [i, "v#{i}"]) + end + + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + earliest = Fluss.earliest_offset() + + :ok = + Fluss.LogScanner.subscribe_buckets(scanner, [ + {0, earliest}, + {1, earliest}, + {2, earliest} + ]) + + records = poll_records(scanner, 9) + assert length(records) == 9 + + ids = records |> Enum.map(fn r -> r[:row][:id] end) |> Enum.sort() + assert ids == Enum.to_list(1..9) + + cleanup_table(admin, table_name) + end + end + + describe "admin operations" do + test "create and drop database", %{admin: admin} do + db_name = "ex_test_db_#{:rand.uniform(100_000)}" + :ok = Fluss.Admin.create_database(admin, db_name, true) + + {:ok, databases} = Fluss.Admin.list_databases(admin) + assert db_name in databases + + :ok = Fluss.Admin.drop_database(admin, db_name, true) + end + + test "list tables", %{admin: admin} do + table_name = "ex_test_list_tables_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + {:ok, tables} = Fluss.Admin.list_tables(admin, @database) + assert table_name in tables + + cleanup_table(admin, table_name) + end + + test "table metadata", %{conn: conn, admin: admin} do + table_name = "ex_test_table_meta_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("name", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + assert Fluss.Table.has_primary_key?(table) == false + assert Fluss.Table.column_names(table) == ["id", "name"] + + cleanup_table(admin, table_name) + end + end + + describe "scan from offset" do + test "subscribe from specific offset skips earlier records", %{conn: conn, admin: admin} do + table_name = "ex_test_scan_offset_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + for i <- 1..5 do + {:ok, _} = Fluss.AppendWriter.append(writer, [i]) + end + + :ok = Fluss.AppendWriter.flush(writer) + + # Subscribe from offset 3, should skip first 3 records + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, 3) + + records = poll_records(scanner, 2) + assert length(records) == 2 + + ids = records |> Enum.map(fn r -> r[:row][:id] end) |> Enum.sort() + assert ids == [4, 5] + + cleanup_table(admin, table_name) + end + end + + describe "multiple flushes" do + test "append, flush, append more, flush, scan all", %{conn: conn, admin: admin} do + table_name = "ex_test_multi_flush_#{:rand.uniform(100_000)}" + cleanup_table(admin, table_name) + + schema = + Fluss.Schema.build() + |> Fluss.Schema.column("id", :int) + |> Fluss.Schema.column("batch", :string) + |> Fluss.Schema.build!() + + descriptor = Fluss.TableDescriptor.new!(schema) + :ok = Fluss.Admin.create_table(admin, @database, table_name, descriptor, false) + + table = Fluss.Table.get!(conn, @database, table_name) + writer = Fluss.AppendWriter.new!(table) + + # First batch + {:ok, _} = Fluss.AppendWriter.append(writer, [1, "first"]) + {:ok, _} = Fluss.AppendWriter.append(writer, [2, "first"]) + :ok = Fluss.AppendWriter.flush(writer) + + # Second batch + {:ok, _} = Fluss.AppendWriter.append(writer, [3, "second"]) + {:ok, _} = Fluss.AppendWriter.append(writer, [4, "second"]) + :ok = Fluss.AppendWriter.flush(writer) + + scanner = Fluss.LogScanner.new!(table) + :ok = Fluss.LogScanner.subscribe(scanner, 0, Fluss.earliest_offset()) + + records = poll_records(scanner, 4) + assert length(records) == 4 + + sorted = Enum.sort_by(records, fn r -> r[:row][:id] end) + assert Enum.at(sorted, 0)[:row][:batch] == "first" + assert Enum.at(sorted, 1)[:row][:batch] == "first" + assert Enum.at(sorted, 2)[:row][:batch] == "second" + assert Enum.at(sorted, 3)[:row][:batch] == "second" + + cleanup_table(admin, table_name) + end + end + + defp poll_records(scanner, expected_count, timeout_ms \\ 10_000) do + deadline = System.monotonic_time(:millisecond) + timeout_ms + do_poll(scanner, expected_count, deadline, []) + end + + defp do_poll(_scanner, expected_count, _deadline, acc) when length(acc) >= expected_count do + acc + end + + defp do_poll(scanner, expected_count, deadline, acc) do + remaining = deadline - System.monotonic_time(:millisecond) + + if remaining <= 0 do + acc + else + :ok = Fluss.LogScanner.poll(scanner, min(5_000, remaining)) + + receive do + {:fluss_records, records} -> + do_poll(scanner, expected_count, deadline, acc ++ records) + + {:fluss_poll_error, _reason} -> + do_poll(scanner, expected_count, deadline, acc) + after + min(6_000, remaining) -> + acc + end + end + end + + defp cleanup_table(admin, table_name) do + Fluss.Admin.drop_table(admin, @database, table_name, true) + end + + defp connect_with_retry(config, timeout_s) do + deadline = System.monotonic_time(:second) + timeout_s + do_connect_retry(config, deadline, nil) + end + + defp do_connect_retry(config, deadline, last_error) do + if System.monotonic_time(:second) >= deadline do + raise "Could not connect to Fluss cluster: #{inspect(last_error)}" + end + + try do + conn = Fluss.Connection.new!(config) + admin = Fluss.Admin.new!(conn) + {:ok, _databases} = Fluss.Admin.list_databases(admin) + {conn, admin} + rescue + e -> + Process.sleep(2_000) + do_connect_retry(config, deadline, e) + end + end +end diff --git a/bindings/elixir/test/support/cluster.ex b/bindings/elixir/test/support/cluster.ex new file mode 100644 index 00000000..59598b4e --- /dev/null +++ b/bindings/elixir/test/support/cluster.ex @@ -0,0 +1,211 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.Test.Cluster do + @moduledoc false + + @fluss_image "apache/fluss" + @fluss_version "0.9.0-incubating" + + @network_name "fluss-elixir-test-network" + @zookeeper_name "zookeeper-elixir-test" + @coordinator_name "coordinator-server-elixir-test" + @tablet_server_name "tablet-server-elixir-test" + + # Same fixed ports used by Python/C++ integration tests. + @coordinator_sasl_port 9123 + @coordinator_plain_port 9223 + @tablet_sasl_port 9124 + @tablet_plain_port 9224 + + def bootstrap_servers, do: "127.0.0.1:#{@coordinator_plain_port}" + + def ensure_started do + case System.get_env("FLUSS_BOOTSTRAP_SERVERS") do + nil -> start_cluster() + servers -> {:ok, servers} + end + end + + def stop do + for name <- [@tablet_server_name, @coordinator_name, @zookeeper_name] do + System.cmd("docker", ["rm", "-f", name], stderr_to_stdout: true) + end + + System.cmd("docker", ["network", "rm", @network_name], stderr_to_stdout: true) + :ok + end + + defp start_cluster do + if port_open?(@coordinator_plain_port) do + IO.puts("Reusing existing Fluss cluster on port #{@coordinator_plain_port}") + {:ok, bootstrap_servers()} + else + do_start_cluster() + end + end + + defp do_start_cluster do + IO.puts("Starting Fluss cluster via Docker...") + + # Remove any leftover containers from previous runs + for name <- [@tablet_server_name, @coordinator_name, @zookeeper_name] do + System.cmd("docker", ["rm", "-f", name], stderr_to_stdout: true) + end + + System.cmd("docker", ["network", "create", @network_name], stderr_to_stdout: true) + + sasl_jaas = + ~s(org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin="admin-secret" user_alice="alice-secret";) + + coordinator_props = + Enum.join( + [ + "zookeeper.address: #{@zookeeper_name}:2181", + "bind.listeners: INTERNAL://#{@coordinator_name}:0, CLIENT://#{@coordinator_name}:9123, PLAIN_CLIENT://#{@coordinator_name}:9223", + "advertised.listeners: CLIENT://localhost:#{@coordinator_sasl_port}, PLAIN_CLIENT://localhost:#{@coordinator_plain_port}", + "internal.listener.name: INTERNAL", + "security.protocol.map: CLIENT:sasl", + "security.sasl.enabled.mechanisms: plain", + "security.sasl.plain.jaas.config: #{sasl_jaas}", + "netty.server.num-network-threads: 1", + "netty.server.num-worker-threads: 3" + ], + "\n" + ) + + tablet_props = + Enum.join( + [ + "zookeeper.address: #{@zookeeper_name}:2181", + "bind.listeners: INTERNAL://#{@tablet_server_name}:0, CLIENT://#{@tablet_server_name}:9123, PLAIN_CLIENT://#{@tablet_server_name}:9223", + "advertised.listeners: CLIENT://localhost:#{@tablet_sasl_port}, PLAIN_CLIENT://localhost:#{@tablet_plain_port}", + "internal.listener.name: INTERNAL", + "security.protocol.map: CLIENT:sasl", + "security.sasl.enabled.mechanisms: plain", + "security.sasl.plain.jaas.config: #{sasl_jaas}", + "tablet-server.id: 0", + "netty.server.num-network-threads: 1", + "netty.server.num-worker-threads: 3" + ], + "\n" + ) + + docker_run([ + "--name", + @zookeeper_name, + "--network", + @network_name, + "-d", + "zookeeper:3.9.2" + ]) + + docker_run([ + "--name", + @coordinator_name, + "--network", + @network_name, + "-p", + "#{@coordinator_sasl_port}:9123", + "-p", + "#{@coordinator_plain_port}:9223", + "-e", + "FLUSS_PROPERTIES=#{coordinator_props}", + "-d", + "#{@fluss_image}:#{@fluss_version}", + "coordinatorServer" + ]) + + docker_run([ + "--name", + @tablet_server_name, + "--network", + @network_name, + "-p", + "#{@tablet_sasl_port}:9123", + "-p", + "#{@tablet_plain_port}:9223", + "-e", + "FLUSS_PROPERTIES=#{tablet_props}", + "-d", + "#{@fluss_image}:#{@fluss_version}", + "tabletServer" + ]) + + all_ports = [@coordinator_plain_port, @tablet_plain_port] + + if wait_for_ports(all_ports, 90) do + IO.puts("Fluss cluster started successfully.") + {:ok, bootstrap_servers()} + else + {:error, "Cluster ports did not become ready within timeout"} + end + end + + defp docker_run(args) do + {output, code} = System.cmd("docker", ["run" | args], stderr_to_stdout: true) + + if code != 0 do + IO.puts("Docker run warning (code #{code}): #{output}") + end + end + + defp wait_for_ports(ports, timeout_s) do + deadline = System.monotonic_time(:second) + timeout_s + + Enum.all?(ports, fn port -> + remaining = deadline - System.monotonic_time(:second) + remaining > 0 and wait_for_port(port, remaining) + end) + end + + defp wait_for_port(port, timeout_s) do + deadline = System.monotonic_time(:second) + timeout_s + + Stream.repeatedly(fn -> + case :gen_tcp.connect(~c"localhost", port, [], 1000) do + {:ok, socket} -> + :gen_tcp.close(socket) + :ok + + {:error, _} -> + Process.sleep(1000) + :retry + end + end) + |> Enum.reduce_while(false, fn + :ok, _acc -> + {:halt, true} + + :retry, _acc -> + if System.monotonic_time(:second) >= deadline, + do: {:halt, false}, + else: {:cont, false} + end) + end + + defp port_open?(port) do + case :gen_tcp.connect(~c"localhost", port, [], 1000) do + {:ok, socket} -> + :gen_tcp.close(socket) + true + + {:error, _} -> + false + end + end +end diff --git a/bindings/elixir/test/test_helper.exs b/bindings/elixir/test/test_helper.exs new file mode 100644 index 00000000..b15b1f44 --- /dev/null +++ b/bindings/elixir/test/test_helper.exs @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Exclude integration tests by default (they need a Docker cluster). +# Run with: mix test --include integration +ExUnit.start(exclude: [:integration]) + +# Stop Docker containers after all tests finish (matches Python's pytest_unconfigure). +ExUnit.after_suite(fn _ -> + unless System.get_env("FLUSS_BOOTSTRAP_SERVERS") do + Fluss.Test.Cluster.stop() + end +end) From 223cf8373ffc1b9dfd7589ddb9a34617cdc14c4e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 28 Mar 2026 23:44:36 +0000 Subject: [PATCH 2/3] fix mix.lock --- .licenserc.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.licenserc.yaml b/.licenserc.yaml index a5b1f76c..09df5650 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -26,6 +26,7 @@ header: - 'NOTICE' - 'DISCLAIMER' - 'bindings/python/fluss/py.typed' + - '**/mix.lock' - 'website/**' - '**/*.md' - '**/DEPENDENCIES.*.tsv' From db1e6db4ce758b6c06ff03f6615802a197a27153 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 5 Apr 2026 19:07:41 +0100 Subject: [PATCH 3/3] fluss_nif license --- bindings/elixir/native/fluss_nif/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/elixir/native/fluss_nif/Cargo.toml b/bindings/elixir/native/fluss_nif/Cargo.toml index 90858606..95296d1c 100644 --- a/bindings/elixir/native/fluss_nif/Cargo.toml +++ b/bindings/elixir/native/fluss_nif/Cargo.toml @@ -19,6 +19,7 @@ name = "fluss_nif" version = "0.1.0" edition = "2024" +license.workspace = true [lib] name = "fluss_nif"