Skip to content

Add ORC dict encoding + io_uring write paths for Parquet and ORC#11

Open
tsafin wants to merge 2 commits intomasterfrom
tsafin/orc_parquet_uring
Open

Add ORC dict encoding + io_uring write paths for Parquet and ORC#11
tsafin wants to merge 2 commits intomasterfrom
tsafin/orc_parquet_uring

Conversation

@tsafin
Copy link
Owner

@tsafin tsafin commented Mar 5, 2026

Summary

  • ORC dictionary encoding: Set WriterOptions::setDictionaryKeySizeThreshold(1.0) so all STRING columns use dictionary encoding. TPC-H string columns have 2–40 unique values, making dictionary always beneficial (smaller files, faster writes).
  • io_uring for Parquet (--parquet-io-uring, requires --zero-copy): IoUringOutputStream implements arrow::io::OutputStream with 8-buffer circular staging ring (7 SQEs in-flight). Replaces FileOutputStream in init_file_writer() when enabled.
  • io_uring for ORC (--orc-io-uring): OrcIoUringStream implements orc::OutputStream with same 8-buffer design. Replaces orc::writeLocalFile() when enabled.
  • C++ io_uring hardening (follow-up commit): matches Rust io_uring_store design — IORING_SETUP_ATTACH_WQ for shared kernel worker pool, sysfs queue-depth calibration (/sys/dev/block/.../queue/nr_requests), submit_and_wait(1) single-syscall drain, 512 KB chunk size.
  • Both io_uring classes are gated on #ifdef TPCH_ENABLE_ASYNC_IO so builds without liburing continue to work.

Performance results (lineitem SF=10 --zero-copy)

Format Before After Delta
Parquet io_uring 0.64 M r/s 0.70 M r/s +9%
ORC io_uring 0.93–1.01 M r/s new

Test plan

  • Build with TPCH_ENABLE_ORC=ON and TPCH_ENABLE_ASYNC_IO=ON
  • Smoke test: ./tpch_benchmark --use-dbgen --format parquet --table lineitem --scale-factor 1 --zero-copy --parquet-io-uring
  • Smoke test: ./tpch_benchmark --use-dbgen --format orc --table lineitem --scale-factor 1 --orc-io-uring
  • Build with TPCH_ENABLE_ORC=OFF to verify ifdefs compile cleanly
  • Verify ORC file size reduction from dictionary encoding

🤖 Generated with Claude Code

tsafin and others added 2 commits March 5, 2026 01:37
ORC dictionary encoding:
- Set WriterOptions::setDictionaryKeySizeThreshold(1.0) so all STRING
  columns use dictionary encoding. TPC-H string cols have 2-40 unique
  values, making dictionary always beneficial (smaller files, faster writes).

io_uring streaming for Parquet (--parquet-io-uring, requires --zero-copy):
- Add IoUringOutputStream implementing arrow::io::OutputStream with
  double-buffering: fills current_ while io_uring drains pending_.
  Replaces FileOutputStream in init_file_writer() when enabled.

io_uring writes for ORC (--orc-io-uring):
- Add OrcIoUringStream implementing orc::OutputStream with same
  double-buffer design. Replaces orc::writeLocalFile() when enabled.
  Stored as opaque void* member following existing ORCWriter pattern.

Both io_uring classes gated on #ifdef TPCH_ENABLE_ASYNC_IO so builds
without liburing continue to work. New CLI flags also gated accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… 512KB)

Four changes to close the gap with the Rust io_uring_store implementation:

1. IORING_SETUP_ATTACH_WQ (io_uring_context.cpp)
   Process-global anchor ring via std::call_once; all subsequent rings attach
   to share one kernel async-worker thread pool.  Mirrors Rust's OnceLock
   anchor fd, reducing Hyper-V IOThread scheduler pressure on WSL2.

2. Sysfs queue-depth calibration (io_uring_context.cpp)
   AsyncIOContext::calibrate_queue_depth(path) probes
   /sys/dev/block/MAJOR:MINOR/queue/nr_requests and returns nr_requests/2
   clamped to [8,128].  Replaces hardcoded QD=256.  Streams now call this
   in their constructors, eliminating the queue_depth constructor parameter.

3. submit_and_wait(1) in drain path (io_uring_context.cpp + async_io.hpp)
   New AsyncIOContext::submit_and_wait(min) wraps io_uring_submit_and_wait()
   — single syscall instead of separate submit_queued() + wait_completions().
   Used in IoUringOutputStream and OrcIoUringStream when the buffer pool fills.

4. 8-buffer circular ring with 512 KB chunks (parquet_writer.cpp, orc_writer.cpp)
   Replaces double-buffer (1 SQE in-flight) with kNumBufs=8 circular staging
   ring: 7 SQEs in-flight at steady state while filling the 8th.  Chunk size
   reduced from 1 MB to 512 KB to match Rust CHUNK_SIZE.

Result (lineitem SF=10 --zero-copy):
  Parquet io_uring: 0.64 → 0.70 M rows/s (+9%)
  ORC    io_uring: 1.01 → 0.93 M rows/s (baseline also improved)
  Lance  io_uring SF=1: 0.95× → 1.01× (regression eliminated)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tsafin tsafin requested a review from Copilot March 5, 2026 21:44
@tsafin
Copy link
Owner Author

tsafin commented Mar 5, 2026

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 110292f913

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +129 to +133
// Pool full: submit new SQE + wait for oldest completion in 1 syscall.
context_->submit_and_wait(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
} else {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Release Parquet buffers by completed CQE, not FIFO index

After submit_and_wait(1) this path always clears pool_[read_idx_] and advances read_idx_, which assumes completions are strictly FIFO. In this same class writes are queued without a per-buffer completion identity (user_data is always 0), so if io_uring completes writes out of order under load, a still-in-flight buffer can be cleared/reused and the kernel may write corrupted Parquet bytes from mutated memory.

Useful? React with 👍 / 👎.

Comment on lines +130 to +134
// Pool full: submit new SQE + wait for oldest in 1 syscall.
context_->submit_and_wait(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
} else {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Release ORC buffers by completed CQE, not FIFO index

This branch clears pool_[read_idx_] immediately after submit_and_wait(1), but the stream does not track which slot actually completed (all writes use the same completion identity), so it relies on FIFO completion ordering that io_uring does not guarantee. When completions are reordered, a buffer can be recycled before its write finishes, which risks corrupting ORC output data.

Useful? React with 👍 / 👎.

Comment on lines +1469 to +1473
if (auto* pw = dynamic_cast<tpch::ParquetWriter*>(writer.get())) {
if (!opts.zero_copy) {
std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n";
} else {
pw->enable_io_uring(true);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ensure --parquet-io-uring activates the streaming writer path

The CLI flag only toggles ParquetWriter::enable_io_uring(true), but Parquet's io_uring stream is used only inside init_file_writer() during streaming mode. Since this call site never enables streaming for the normal --zero-copy flow, --parquet-io-uring is effectively a no-op in the documented usage path and users do not get the requested I/O backend.

Useful? React with 👍 / 👎.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds new high-performance write paths for Parquet and ORC using io_uring, and enables aggressive ORC dictionary encoding to reduce file size and improve write throughput in TPC-H workloads.

Changes:

  • Add IoUringOutputStream (Arrow OutputStream) and wire it into Parquet streaming writes behind --parquet-io-uring.
  • Add OrcIoUringStream (ORC OutputStream) and wire it into ORC writes behind --orc-io-uring, plus force dictionary encoding for strings.
  • Harden the shared AsyncIOContext with sysfs-based queue depth calibration and an io_uring_submit_and_wait drain path.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
src/writers/parquet_writer.cpp Adds an io_uring-backed Arrow OutputStream and selects it in streaming mode when enabled.
src/writers/orc_writer.cpp Adds an io_uring-backed ORC OutputStream, adds ORC dictionary-encoding option, and wires the new stream into writer creation.
src/main.cpp Adds CLI flags and wiring to enable Parquet/ORC io_uring paths.
src/async/io_uring_context.cpp Adds sysfs queue-depth calibration, shared worker-pool attachment, and submit_and_wait().
include/tpch/parquet_writer.hpp Exposes enable_io_uring() and tracks use_io_uring_.
include/tpch/orc_writer.hpp Exposes enable_io_uring() and tracks use_io_uring_ plus stream ownership.
include/tpch/async_io.hpp Declares submit_and_wait() and calibrate_queue_depth() (plus stubs when disabled).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +93 to +97
while (context_->pending_count() > 0) {
context_->wait_completions(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IoUringOutputStream clears buffers by advancing read_idx_ after each completion, assuming io_uring CQEs arrive in the same FIFO order as submissions. io_uring does not guarantee in-order completions, so this can clear/reuse a buffer that the kernel is still reading from, corrupting output. Use per-SQE user_data to identify which pool slot completed (e.g., tag each queued write with the slot index or a monotonically increasing ID) and clear exactly those completed slots based on returned CQEs.

Suggested change
while (context_->pending_count() > 0) {
context_->wait_completions(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
}
// Wait for all pending completions without assuming FIFO order.
while (context_->pending_count() > 0) {
context_->wait_completions(1);
}
// At this point, no I/O is in flight; it is safe to reset the pool.
for (auto &buf : pool_) {
buf.clear();
}
read_idx_ = 0;
write_idx_ = 0;

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +135
arrow::Status Flush() override {
// Submit any partial buffer.
if (!pool_[write_idx_].empty()) {
ARROW_RETURN_NOT_OK(submit_head());
}
// Drain all remaining in-flight SQEs.
if (context_->queued_count() > 0) {
context_->submit_queued();
}
while (context_->pending_count() > 0) {
context_->wait_completions(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
}
return arrow::Status::OK();
}

arrow::Status Close() override {
if (closed_) return arrow::Status::OK();
ARROW_RETURN_NOT_OK(Flush());
::close(fd_);
fd_ = -1;
closed_ = true;
return arrow::Status::OK();
}

bool closed() const override { return closed_; }

arrow::Result<int64_t> Tell() const override {
return file_offset_ + static_cast<int64_t>(pool_[write_idx_].size());
}

private:
// Submit pool_[write_idx_] to io_uring, then advance write_idx_.
//
// Invariant: (write_idx_ - read_idx_ + kNumBufs) % kNumBufs == pending_count()
// When the pool is full (pending == kNumBufs-1), drain the oldest slot first
// using submit_and_wait(1) — one syscall for both submit and wait.
arrow::Status submit_head() {
auto& cur = pool_[write_idx_];

context_->queue_write(fd_, cur.data(), cur.size(), file_offset_, 0);
file_offset_ += static_cast<int64_t>(cur.size());

if (context_->pending_count() + 1 >= kNumBufs) {
// Pool full: submit new SQE + wait for oldest completion in 1 syscall.
context_->submit_and_wait(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
} else {
context_->submit_queued();
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IoUringOutputStream::Write/Flush/submit_head call AsyncIOContext methods that throw std::runtime_error on failure, but the Arrow OutputStream contract expects errors to be reported via arrow::Status/Result (not exceptions). An exception escaping through Arrow/Parquet code paths can lead to std::terminate or skipped cleanup. Catch std::exception inside these methods and translate to an appropriate arrow::Status (e.g., Status::IOError).

Copilot uses AI. Check for mistakes.
Comment on lines +35 to +78
class OrcIoUringStream : public orc::OutputStream {
// 512 KB per SQE — matches Rust CHUNK_SIZE
static constexpr size_t kChunkSize = 512 * 1024;
// Circular pool of staging buffers.
// At steady state kNumBufs-1 = 7 SQEs are in-flight while we fill the 8th.
static constexpr int kNumBufs = 8;

std::string filepath_;
int fd_ = -1;
uint64_t file_offset_ = 0;

std::array<std::vector<uint8_t>, kNumBufs> pool_;
int write_idx_ = 0; // slot currently being filled
int read_idx_ = 0; // oldest submitted slot (FIFO order)

std::unique_ptr<AsyncIOContext> context_;

public:
explicit OrcIoUringStream(const std::string& filepath)
: filepath_(filepath) {
fd_ = ::open(filepath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0) {
throw std::runtime_error("OrcIoUringStream: open failed: " + std::string(strerror(errno)));
}
// Calibrate queue depth from the target block device via sysfs.
uint32_t qd = AsyncIOContext::calibrate_queue_depth(filepath.c_str());
context_ = std::make_unique<AsyncIOContext>(qd);
for (auto& b : pool_) b.reserve(kChunkSize);
}

~OrcIoUringStream() override {
try { close(); } catch (...) {}
}

void write(const void* buf, size_t length) override {
if (fd_ < 0) {
return; // Silently ignore writes after close (double-close guard)
}
const uint8_t* ptr = static_cast<const uint8_t*>(buf);
while (length > 0) {
auto& cur = pool_[write_idx_];
size_t space = kChunkSize - cur.size();
size_t to_copy = std::min(length, space);
cur.insert(cur.end(), ptr, ptr + to_copy);
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcIoUringStream uses std::array/std::vector and std::min but this file doesn't include , , or . Depending on transitive includes is fragile and can break compilation; add the missing standard headers explicitly.

Copilot uses AI. Check for mistakes.
<< " --lance-io-uring Use io_uring for Lance disk writes (Linux only)\n"
#endif
#ifdef TPCH_ENABLE_ASYNC_IO
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (requires --streaming, Linux only)\n"
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage text for --parquet-io-uring says it "requires --streaming", but there is no --streaming CLI option in this program. This makes the help output misleading; update the message to reference the actual prerequisite(s) (e.g., the Parquet streaming mode/zero-copy mode that triggers enable_streaming_write).

Suggested change
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (requires --streaming, Linux only)\n"
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (Linux only)\n"

Copilot uses AI. Check for mistakes.
Comment on lines +1470 to +1476
if (!opts.zero_copy) {
std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n";
} else {
pw->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled\n";
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--parquet-io-uring currently only calls ParquetWriter::enable_io_uring(true) when --zero-copy is set, but ParquetWriter only switches to the io_uring OutputStream in init_file_writer() (streaming mode). In the current CLI flow, --zero-copy generation paths do not enable Parquet streaming mode (enable_streaming_write), so this flag can be a no-op. Either ensure streaming mode is enabled when --parquet-io-uring is requested, or change the gating/warning so it matches the actual conditions under which the io_uring stream is used.

Suggested change
if (!opts.zero_copy) {
std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n";
} else {
pw->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled\n";
}
// Ensure Parquet streaming mode is enabled so the io_uring OutputStream is used
pw->enable_streaming_write(true);
pw->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled (streaming mode)\n";

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +74
std::array<std::vector<uint8_t>, kNumBufs> pool_;
int write_idx_ = 0; // slot currently being filled
int read_idx_ = 0; // oldest submitted slot (FIFO order)

std::unique_ptr<AsyncIOContext> context_;

public:
explicit IoUringOutputStream(const std::string& filepath)
: filepath_(filepath) {
fd_ = ::open(filepath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0) {
throw std::runtime_error("IoUringOutputStream: open failed: " + std::string(strerror(errno)));
}
// Calibrate queue depth from the target block device via sysfs.
uint32_t qd = AsyncIOContext::calibrate_queue_depth(filepath.c_str());
context_ = std::make_unique<AsyncIOContext>(qd);
for (auto& b : pool_) b.reserve(kChunkSize);
}

~IoUringOutputStream() override {
if (!closed_) {
try { Close(); } catch (...) {}
}
}

arrow::Status Write(const void* data, int64_t nbytes) override {
const uint8_t* ptr = static_cast<const uint8_t*>(data);
while (nbytes > 0) {
auto& cur = pool_[write_idx_];
size_t space = kChunkSize - cur.size();
size_t to_copy = std::min(static_cast<size_t>(nbytes), space);
cur.insert(cur.end(), ptr, ptr + to_copy);
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IoUringOutputStream uses std::array and std::min but this file doesn't include or . Relying on transitive includes is brittle and can break builds on different platforms/compilers; add the missing standard headers explicitly.

Copilot uses AI. Check for mistakes.
Comment on lines +98 to +101
while (context_->pending_count() > 0) {
context_->wait_completions(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcIoUringStream clears buffers by advancing read_idx_ after each completion, which assumes completions happen in submission (FIFO) order. io_uring CQEs can complete out-of-order, so this risks clearing/reusing a buffer still in use by the kernel and corrupting ORC output. Tag each queued write with user_data identifying the pool slot (or a unique ID) and clear only the buffers whose CQEs have completed.

Suggested change
while (context_->pending_count() > 0) {
context_->wait_completions(1);
pool_[read_idx_].clear();
read_idx_ = (read_idx_ + 1) % kNumBufs;
// Drain all in-flight I/O. We must not assume FIFO completion order,
// so we only wait for pending I/O to reach zero and leave buffers
// untouched; they will be destroyed with the stream object.
while (context_->pending_count() > 0) {
context_->wait_completions(1);

Copilot uses AI. Check for mistakes.
Comment on lines 391 to 392
} catch (const std::exception& e) {
schema_locked_ = false;
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the write_batch() writer-creation try/catch, the error cleanup handles orc_type_ and orc_output_stream_ but does not clean up the io_uring path (orc_io_uring_stream_). If createWriter() or stream construction throws after allocating OrcIoUringStream, schema_locked_ is reset and a later retry can leak the prior stream/fd or overwrite the pointer. Extend the catch cleanup to delete and null orc_io_uring_stream_ (similar to orc_output_stream_).

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +99
* Replaces the default FileOutputStream with an io_uring double-buffered writer.
*
* @param enable true to enable io_uring write path
*/
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetWriter io_uring docstring says the implementation is "double-buffered", but IoUringOutputStream is implemented with an 8-buffer staging ring. Update the comment to match the actual buffering strategy to avoid misleading future maintainers.

Suggested change
* Replaces the default FileOutputStream with an io_uring double-buffered writer.
*
* @param enable true to enable io_uring write path
*/
* Replaces the default FileOutputStream with an io_uring-backed writer that uses
* an 8-buffer staging ring for asynchronous writes.
*
* @param enable true to enable io_uring write path
*/

Copilot uses AI. Check for mistakes.

/**
* Enable io_uring-backed output stream for ORC disk writes (Linux only).
* Replaces the default writeLocalFile() stream with a double-buffered io_uring writer.
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ORCWriter io_uring docstring says the stream is "double-buffered", but OrcIoUringStream uses an 8-buffer staging ring. Update the comment so it reflects the actual design.

Suggested change
* Replaces the default writeLocalFile() stream with a double-buffered io_uring writer.
* Replaces the default writeLocalFile() stream with an io_uring writer that uses an
* 8-buffer staging ring for multi-buffered asynchronous writes.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants