Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ if(TPCH_ENABLE_LANCE AND Lance_FOUND)
list(APPEND CARGO_BUILD_CMD --target "${RUST_TARGET}")
endif()

# io_uring is compiled in by default (see Cargo.toml default features).
# Runtime activation happens via --io-uring CLI flag, not a build switch.

add_custom_command(
OUTPUT "${LANCE_FFI_LIB_FINAL}"
COMMAND ${CMAKE_COMMAND} -E echo "Building Lance FFI library with Rust cargo ..."
Expand All @@ -304,13 +307,18 @@ if(TPCH_ENABLE_LANCE AND Lance_FOUND)
DEPENDS "${LANCE_FFI_DIR}/Cargo.toml"
"${LANCE_FFI_DIR}/Cargo.lock"
"${LANCE_FFI_DIR}/src/lib.rs"
"${LANCE_FFI_DIR}/src/io_uring_store.rs"
COMMENT "Compiling Lance FFI library with Rust (target: ${RUST_TARGET})"
VERBATIM
)

add_custom_target(lance_ffi ALL DEPENDS "${LANCE_FFI_LIB_FINAL}")
message(STATUS "Rust FFI build configured for Lance support")
message(STATUS " Rust target directory: ${LANCE_FFI_BUILD_DIR}")
# io_uring symbols are only present when we build from source (Cargo.toml
# has io-uring as a default feature). Pre-compiled CI libraries may not
# have them, so gate the C++ call site with this flag.
set(LANCE_FFI_HAS_IO_URING TRUE)
endif()

# Import the Rust library as an external target
Expand Down Expand Up @@ -484,6 +492,9 @@ endif()

if(TPCH_ENABLE_LANCE)
target_compile_definitions(tpch_core PUBLIC TPCH_ENABLE_LANCE)
if(LANCE_FFI_HAS_IO_URING)
target_compile_definitions(tpch_core PUBLIC TPCH_LANCE_IO_URING)
endif()
endif()

if(TPCH_ENABLE_ASYNC_IO AND Uring_FOUND)
Expand Down
14 changes: 14 additions & 0 deletions include/tpch/lance_ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ int lance_writer_set_write_params(
long long max_bytes_per_file,
int skip_auto_cleanup);

/**
* Enable or disable the io_uring write path for this writer.
* Must be called before writing the first batch.
* Only available when building lance_ffi from source with the io-uring
* Cargo feature (TPCH_LANCE_IO_URING is defined by CMake in that case).
*
* @param writer Pointer to LanceWriter from lance_writer_create()
* @param enabled 1 to enable io_uring writes, 0 to disable
* @return 0 on success, non-zero on failure
*/
#ifdef TPCH_LANCE_IO_URING
int lance_writer_enable_io_uring(LanceWriter* writer, int enabled);
#endif

/**
* Write a batch of records to the Lance dataset.
*
Expand Down
11 changes: 11 additions & 0 deletions include/tpch/lance_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class LanceWriter : public WriterInterface {
*/
void set_stream_queue_depth(size_t depth) { stream_queue_depth_ = depth; }

/**
* Enable io_uring write path (Linux only, requires io-uring feature compiled in).
* Must be called before the first batch is written.
*/
#ifdef TPCH_LANCE_IO_URING
void enable_io_uring(bool enabled) { use_io_uring_ = enabled; }
#endif

private:
std::string dataset_path_;
std::string dataset_name_;
Expand All @@ -124,6 +132,9 @@ class LanceWriter : public WriterInterface {
int64_t max_rows_per_group_ = 0;
int64_t max_bytes_per_file_ = 0;
bool skip_auto_cleanup_ = false;
#ifdef TPCH_LANCE_IO_URING
bool use_io_uring_ = false;
#endif

size_t stream_queue_depth_ = 16;
std::shared_ptr<StreamState> stream_state_;
Expand Down
26 changes: 24 additions & 2 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct Options {
long lance_rows_per_group = 0;
long lance_max_bytes_per_file = 0;
bool lance_skip_auto_cleanup = false;
bool lance_io_uring = false;
long lance_stream_queue = 16;
std::string lance_stats_level;
double lance_cardinality_sample_rate = 1.0; // Phase 3.1: Sampling-based cardinality
Expand All @@ -67,6 +68,7 @@ constexpr int OPT_LANCE_SKIP_AUTO_CLEANUP = 1003;
constexpr int OPT_LANCE_STREAM_QUEUE = 1004;
constexpr int OPT_LANCE_STATS_LEVEL = 1005;
constexpr int OPT_LANCE_CARDINALITY_SAMPLE_RATE = 1006; // Phase 3.1
constexpr int OPT_LANCE_IO_URING = 1007;

void print_usage(const char* prog) {
std::cout << "Usage: " << prog << " [options]\n"
Expand Down Expand Up @@ -107,6 +109,7 @@ void print_usage(const char* prog) {
<< " --lance-cardinality-sample-rate <0.0-1.0> Cardinality sampling rate (Phase 3.1)\n"
<< " Controls HyperLogLog sampling: 1.0=100% (default),\n"
<< " 0.5=50%, 0.1=10%. Smaller rates = faster writes.\n"
<< " --lance-io-uring Use io_uring for Lance disk writes (Linux only)\n"
#endif
<< " --verbose Verbose output\n"
<< " --help Show this help message\n";
Expand All @@ -133,6 +136,7 @@ Options parse_args(int argc, char* argv[]) {
{"lance-stream-queue", required_argument, nullptr, OPT_LANCE_STREAM_QUEUE},
{"lance-stats-level", required_argument, nullptr, OPT_LANCE_STATS_LEVEL},
{"lance-cardinality-sample-rate", required_argument, nullptr, OPT_LANCE_CARDINALITY_SAMPLE_RATE},
{"lance-io-uring", no_argument, nullptr, OPT_LANCE_IO_URING},
#endif
#ifdef TPCH_ENABLE_ASYNC_IO
{"async-io", no_argument, nullptr, 'a'},
Expand Down Expand Up @@ -198,6 +202,9 @@ Options parse_args(int argc, char* argv[]) {
exit(1);
}
break;
case OPT_LANCE_IO_URING:
opts.lance_io_uring = true;
break;
// case 'Z': DISABLED - true-zero-copy removed, use --zero-copy instead
#ifdef TPCH_ENABLE_ASYNC_IO
case 'a':
Expand Down Expand Up @@ -1152,10 +1159,15 @@ int generate_all_tables_parallel_v2(const Options& opts) {
auto writer = create_writer(opts.format, output_path);

#ifdef TPCH_ENABLE_LANCE
if (opts.zero_copy || opts.true_zero_copy) {
if (auto lance_writer = dynamic_cast<tpch::LanceWriter*>(writer.get())) {
if (auto lance_writer = dynamic_cast<tpch::LanceWriter*>(writer.get())) {
if (opts.zero_copy || opts.true_zero_copy) {
lance_writer->enable_streaming_write(true);
}
#ifdef TPCH_LANCE_IO_URING
if (opts.lance_io_uring) {
lance_writer->enable_io_uring(true);
}
#endif
}
#endif

Expand Down Expand Up @@ -1422,6 +1434,16 @@ int main(int argc, char* argv[]) {
std::cout << "Lance streaming write mode enabled (zero-copy)\n";
}
}

// Enable io_uring write path if requested
#ifdef TPCH_LANCE_IO_URING
if (opts.lance_io_uring) {
lance_writer->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Lance io_uring write path enabled\n";
}
}
#endif
}
#endif

Expand Down
10 changes: 10 additions & 0 deletions src/writers/lance_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ void LanceWriter::initialize_lance_dataset(
throw std::runtime_error("Failed to create Lance writer via FFI");
}

// Enable io_uring write path if requested (--lance-io-uring flag)
#ifdef TPCH_LANCE_IO_URING
if (use_io_uring_) {
int result = lance_writer_enable_io_uring(reinterpret_cast<::LanceWriter*>(rust_writer_), 1);
if (result != 0) {
throw std::runtime_error("Failed to enable io_uring for Lance writer");
}
}
#endif

if (max_rows_per_file_ > 0 || max_rows_per_group_ > 0 || max_bytes_per_file_ > 0 || skip_auto_cleanup_) {
int result = lance_writer_set_write_params(
reinterpret_cast<::LanceWriter*>(rust_writer_),
Expand Down
16 changes: 16 additions & 0 deletions third_party/lance-ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions third_party/lance-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ authors = ["Timur Safin <timur.safin@gmail.com>"]
# Rust 1.93.0 now available - enabling full Arrow/Lance support
# Implements actual Lance dataset writing in Rust FFI

[features]
# io-uring is compiled in by default on Linux so runtime --io-uring flag works
# without needing a separate binary build.
default = ["io-uring"]
io-uring = ["dep:io-uring"]

[dependencies]
arrow = { version = "57", features = ["ffi"] }
lance = { path = "../lance/rust/lance" }
lance-io = { path = "../lance/rust/lance-io" }
object_store = "0.12"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "macros"] }
tokio-stream = "0.1"
futures = "0.3"
serde_json = "1.0"
libc = "0.2"
async-trait = "0.1"
bytes = "1"
io-uring = { version = "0.7", optional = true }

[lib]
crate-type = ["staticlib"]
Expand Down
Loading