diff --git a/build-algorithm.md b/build-algorithm.md new file mode 100644 index 0000000..7490726 --- /dev/null +++ b/build-algorithm.md @@ -0,0 +1,227 @@ +# SSHash build algorithm + +This note describes how `sshash build` constructs a dictionary while keeping +peak resident memory bounded by the user-supplied `-g` (in GiB). + +The design has two ideas, applied uniformly: + +1. **Spill, don't accumulate.** Every intermediate that grows with the input + size is written to a tmp file under `-d` (tmp dir) rather than held in a + `std::vector` / bit-vector in RAM. Producers append through a small write + buffer; consumers re-read through a small read buffer + (`buffered_record_stream`). +2. **Cap working buffers at a fraction of `-g`.** Buffers that live + only inside one step are sized as `ram_limit_in_GiB · GiB / N` (with `N` + typically 2 or 8). The constants are picked so that even when several + buffers are alive at the same time across overlapping steps, their sum + stays under the user budget while heap fragmentation across step + transitions is absorbed. + +The build never materializes the final index in RAM. Instead, step 8 +streams it directly to the user-supplied output file (or a tmp file, deleted +on exit, when the user did not pass `-o`). To run `--check`, `tools/build.cpp` +mmaps the saved file and runs the correctness queries against the mmap'd +dictionary. + +--- + +## Pipeline overview + +The orchestration is in `include/builder/dictionary_builder.hpp` +(`run_steps_1_through_7` + `build`). Per-step details are in +`src/builder/{encode_strings,compute_minimizer_tuples,build_sparse_and_skew_index}.cpp`. + +| Step | What it produces | Where it lives between steps | +|------|------------------|------------------------------| +| 1 | Encoded `strings` bit-vector + `strings_offsets` | tmp files (`disk_backed_strings`, `disk_backed_offsets_builder`) | +| 1.1 | Compressed weights (only if `--weighted`) | `weights::builder` (in-RAM, bounded by run-length structure) | +| 2 | Per-thread sorted runs of `minimizer_tuple` | tmp files, one per flushed buffer | +| 3 | Single sorted run of all `minimizer_tuple`s | tmp file (k-way external merge) | +| 4 | Minimizers MPHF F | tmp file (spilled at end of step 5) | +| 5 | Minimizer values replaced by F(minimizer); buffers re-flushed in F-order | new sorted runs, tmp files | +| 6 | Single sorted run keyed by F(minimizer) | tmp file | +| 7.1 | Sparse-index components (`control_codewords`, `mid_load_buckets`) | tmp files | +| 7.2 | Skew-index components (`heavy_load_buckets`, per-partition MPHFs and `positions`) | tmp files | +| 8 | Final on-disk index file | streamed to output, tmp files removed | + +After step 8 the dictionary object `d` is **not** query-ready: the spilled +components were copied into the output file but never read back into `d`. +`finalize_stats` reports `index_size_in_bytes` via `std::filesystem::file_size` +on the saved path. + +--- + +## Step 1 — encode strings (`encode_strings.cpp`) + +Iterates the input FASTA, producing the 2-bit-packed `strings` bit-vector +and the `strings_offsets` array (one offset per sequence + a sentinel). +Both go through disk-backed builders: + +- **`disk_backed_strings`**: appends 2-bit characters into a small in-RAM + word buffer; flushes the buffer to a tmp file when full. +- **`disk_backed_offsets_builder`**: appends one `uint64_t` offset + per sequence into a small write buffer; flushes to a tmp file. + +In-RAM footprint of step 1 is proportional to the buffer size, regardless of +input size. + +## Step 1.1 — weights (optional) + +Only runs with `--weighted`. The weights builder uses run-length encoding: +its in-RAM size is proportional to the number of distinct weights, not to +the number of k-mers. + +## Step 2 — compute minimizer tuples (`compute_minimizer_tuples.cpp`) + +Each thread streams its assigned slice of the input via the disk-backed +strings/offsets readers and emits `minimizer_tuple` records into a private +in-RAM buffer: + +```cpp +buffer_size = (ram_limit · GiB) / (2 · sizeof(minimizer_tuple) · num_threads) +``` + +When the buffer fills, the thread sorts it in parallel and flushes a sorted +run to a tmp file (`minimizers_tuples::sort_and_flush`). The factor of 2 +in the denominator leaves headroom for `std::sort`'s allocations and +inter-thread contention; the per-thread split makes the total in-RAM tuple +buffer ≈ `ram_limit / 2`. + +## Step 3 — k-way external merge (`minimizers_tuples::merge`) + +The N tmp files from step 2 are merged into a single sorted run via a +**winner-tree-based external-merge iterator** (`file_merging_iterator`). +Each input file is read through a `buffered_record_stream` +with `default_buffer_records = 4096` records, so the total in-RAM merge +state is `N · 4096 · sizeof(minimizer_tuple)` ≈ tens of MB even for very +many runs. The output is written through a small `std::ofstream` buffer. + +When N == 1 the merge degenerates to a rename + a single streaming scan to +collect bucket statistics; same RAM bound. + +## Step 4 — build minimizers MPHF + +Builds an external-memory partitioned PHF over distinct minimizers, using +pthash's `build_in_external_memory`. The minimizers are streamed from the +sorted run via `streaming_minimizers_iterator` (one buffered ifstream), +and pthash spills its own working hashes under `tmp_dirname` capped by +`mphf_build_config.ram = ram_limit / 2`. + +## Step 5 — replace minimizer values with F(minimizer) + +The merged file is re-read in fixed-size blocks; each block is hashed in +parallel and re-flushed as a new sorted run. Two RAM caps are combined: + +```cpp +RAM_available = ram_limit · GiB − sizeof(F) − offsets_builder.num_bytes() +buffer_unbounded = RAM_available / (3 · sizeof(minimizer_tuple)) // 3× = read+sort scratch+write +buffer_cap = (ram_limit · GiB / 8) / sizeof(minimizer_tuple) +buffer_size = min(buffer_unbounded, buffer_cap) +``` + +The `/ 8` cap exists because step 5 leaves heap pages dirtied that linger +into later steps' allocations; capping at one-eighth of the budget keeps +the cumulative RSS under `ram_limit` when steps 6/7 start allocating. + +After step 5, the minimizers MPHF F is **spilled to disk** and the in-RAM +copy is freed: subsequent steps only ever use F(minimizer) values, not F +itself. + +## Step 6 — re-merge in F-order + +Same machinery as step 3, applied to the new sorted runs from step 5. + +## Step 7.1 — sparse index (`build_sparse_and_skew_index.cpp`) + +Constructs `control_codewords` and `mid_load_buckets`. Both are produced as +on-disk `bits::compact_vector` files via `streaming_compact_vector_writer`, +so neither is ever materialized in RAM. + +## Step 7.2 — skew index + +The most RAM-sensitive step; it has three internal phases, all +disk-backed: + +- **Phase B (k-mer extraction requests).** Heavy-bucket entries become + `kmer_extraction_request` records. They are external-sorted by + `starting_pos` so that k-mer extraction reduces to a single forward + scan over `strings`. The request buffer is capped at + `ram_limit / 8 / sizeof(kmer_extraction_request)`; flushed runs are + merged with `file_merging_iterator`. +- **Per-partition kmer files.** While walking `strings` in request-sorted + order, each extracted k-mer is written to its partition's tmp file via + a buffered writer; this file is the input to the partition's MPHF. +- **Phase C (per-partition MPHF + `positions`).** For each skew partition: + 1. Build the partition MPHF with pthash external-memory (`ram = ram_limit / 2`, + iterator: `skew_partition_kmer_iterator` over the partition's tmp file). + 2. Stream-read the partition file again, emit `(F(kmer), pos_in_bucket)` + tuples; external-sort them in `ram_limit / 8`-sized buffers and merge. + 3. Pack the sorted tuples into the partition's `positions` + compact_vector via `streaming_compact_vector_writer`. + + Only the freshly-built MPHF for the *current* partition lives in RAM + during phase C; once spilled (`essentials::save`), it is freed before the + next partition starts. `positions` is fully on-disk. + +## Step 8 — stream-save (`include/builder/streaming_save.hpp`) + +The dictionary `d` is walked by `essentials::saver`, but every spilled +component is intercepted via an **address+type-keyed substitution map** +(`typed_address_sub`): when the saver visits a registered (address, type) +pair, it appends the bytes of the corresponding tmp file straight into the +output stream instead of reading from `d`. The strings bit-vector goes +through the same mechanism via `disk_backed_strings`. + +Concretely, the registered substitutions are: + +| Component | Source tmp file | +|-----------------------------------|----------------------------------------| +| `m_ssi.codewords.control_codewords` | step 7.1 | +| `m_ssi.mid_load_buckets` | step 7.1 | +| `m_ssi.ski.heavy_load_buckets` | step 7.2 phase B | +| `m_ssi.codewords.mphf` | step 5 spill | +| `m_ssi.ski.positions[i]` | step 7.2 phase C, per partition | +| `m_ssi.ski.mphfs[i]` | step 7.2 phase C, per partition | +| `m_spss.strings` | step 1 (`disk_backed_strings`) | + +Because the substitutions are by `(address, type)` pair, a struct's address +coinciding with its first member's address does not cause confusion. + +After step 8 returns, the tmp files are removed and `finalize_stats` reads +the saved file's size with `std::filesystem::file_size`. + +--- + +## How the RAM cap is enforced — summary + +The on-disk index size grows with `num_kmers`. The build's **resident** +memory does not, because every component that scales with input size is +either: + +- **always on disk** (`strings`, `strings_offsets`, all sorted minimizer + runs, the merged minimizers file, the sparse-index compact_vectors, the + skew-index per-partition kmer/positions files, the codewords MPHF and + per-partition MPHFs), or +- **bounded by a working buffer** sized as a fraction of `ram_limit`: + + | Buffer | Cap | + |----------------------------------------|--------------------------| + | Step 2 per-thread minimizer buffer | `ram_limit / 2 / num_threads` | + | Step 5 hashing buffer | `min(ram/8, RAM_available/3)` | + | Step 7.2 kmer-request external sort | `ram_limit / 8` | + | Step 7.2 phase-C `position_tuple` sort | `ram_limit / 8` | + | pthash external-memory builds | `ram_limit / 2` (its own `ram` field) | + | Every disk-backed reader/writer | `default_buffer_records ≈ 32 KiB` | + | Every external merge front (per run) | `4096 · sizeof(T)` | + +The fractions (`/2` for the dominant per-step buffer, `/8` for buffers +that span step boundaries) are chosen so that overlapping allocations and +heap fragmentation between steps stay under `ram_limit` in practice. There +is a hard floor of `min_ram_limit_in_GiB` (enforced in +`validate_and_normalize_build_config`) below which step 4's MPHF builder +no longer has enough room to make progress. + +The result: peak RSS during the build is governed by `-g`, not by +the input size or by the on-disk index size, and the saved index is +identical (byte-for-byte) to one written by an in-RAM builder followed by +`essentials::save`. diff --git a/external/pthash b/external/pthash index e04a192..a95e814 160000 --- a/external/pthash +++ b/external/pthash @@ -1 +1 @@ -Subproject commit e04a1920ffeae9e7d876acd0362cab79605f7af3 +Subproject commit a95e8147a8ba1fa33b57fa24de7b5e674423e9a7 diff --git a/include/buckets_statistics.hpp b/include/buckets_statistics.hpp index 6f582d7..13676f1 100644 --- a/include/buckets_statistics.hpp +++ b/include/buckets_statistics.hpp @@ -59,6 +59,12 @@ struct buckets_statistics { uint64_t max_bucket_size() const { return m_max_bucket_size; } uint64_t max_sparse_buckets_per_size() const { return m_max_sparse_buckets_per_size; } + /* Histogram bin: number of buckets whose size equals `s`. Bins beyond + MAX_BUCKET_SIZE are not tracked individually and return 0. */ + uint64_t num_buckets_of_size(uint64_t s) const { + return s < m_bucket_sizes.size() ? m_bucket_sizes[s] : uint64_t(0); + } + void print_full() const { std::cout << "=== bucket statistics (full) === \n"; for (uint64_t bucket_size = 1, prev_bucket_size = 0, prev_kmers_in_buckets = 0, diff --git a/include/builder/buffered_record_stream.hpp b/include/builder/buffered_record_stream.hpp new file mode 100644 index 0000000..28c72de --- /dev/null +++ b/include/builder/buffered_record_stream.hpp @@ -0,0 +1,103 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace sshash { + +/* + A small buffered, forward-only reader of fixed-size records over a + binary file. Records are read in fixed-capacity chunks (~`buffer_records + * sizeof(T)` bytes of RAM) so the per-instance footprint is bounded + independently of the file size. + + Used as the underlying primitive by all of SSHash's builder readers + over on-disk record files (minimizer tuples, kmer requests, kmer + records, offset values, sorted-run records, etc.). The class is + move-only; for callers that need a copyable forward iterator (e.g. + pthash's `build_in_external_memory`, which takes an iterator by + value), wrap an instance in a `std::shared_ptr`. + + Usage: + buffered_record_stream s; + s.open(filename); + while (!s.empty()) { + consume(s.current()); + s.advance(); + } + s.close(); +*/ +template +struct buffered_record_stream { + static constexpr uint64_t default_buffer_records = 4096; + + buffered_record_stream() = default; + buffered_record_stream(buffered_record_stream const&) = delete; + buffered_record_stream& operator=(buffered_record_stream const&) = delete; + buffered_record_stream(buffered_record_stream&&) = default; + buffered_record_stream& operator=(buffered_record_stream&&) = default; + + /* Open `filename` for forward reading; optionally seek to byte + `start_byte` before priming the read window. */ + void open(std::string const& filename, uint64_t buffer_records = default_buffer_records, + std::streamoff start_byte = 0) { + m_buf.resize(std::max(1, buffer_records)); + m_in.open(filename, std::ifstream::binary); + if (!m_in.is_open()) { throw std::runtime_error("cannot open file '" + filename + "'"); } + if (start_byte != 0) m_in.seekg(start_byte, std::ios::beg); + m_pos = 0; + m_size = 0; + m_eof = false; + refill(); + } + + void close() { + if (m_in.is_open()) m_in.close(); + m_buf.clear(); + m_buf.shrink_to_fit(); + m_pos = 0; + m_size = 0; + m_eof = true; + } + + bool is_open() const { return m_in.is_open(); } + + /* True iff there are no more records in the stream. */ + bool empty() const { return m_pos >= m_size; } + + /* Reference to the current record. Valid until the next `advance()`. */ + T const& current() const { + assert(!empty()); + return m_buf[m_pos]; + } + + /* Move to the next record; refills the buffer from disk on demand. */ + void advance() { + assert(!empty()); + ++m_pos; + if (m_pos >= m_size && !m_eof) refill(); + } + +private: + std::ifstream m_in; + std::vector m_buf; + uint64_t m_pos = 0; + uint64_t m_size = 0; + bool m_eof = true; + + void refill() { + m_pos = 0; + m_in.read(reinterpret_cast(m_buf.data()), + static_cast(m_buf.size() * sizeof(T))); + const std::streamsize got = m_in.gcount(); + m_size = static_cast(got) / sizeof(T); + if (m_size == 0) m_eof = true; + } +}; + +} // namespace sshash diff --git a/include/builder/dictionary_builder.hpp b/include/builder/dictionary_builder.hpp index dea89a5..97de32e 100644 --- a/include/builder/dictionary_builder.hpp +++ b/include/builder/dictionary_builder.hpp @@ -1,21 +1,142 @@ #pragma once +#include +#include +#include + #include "essentials.hpp" #include "include/dictionary.hpp" #include "include/offsets.hpp" #include "include/builder/util.hpp" +#include "include/builder/disk_backed_strings.hpp" +#include "include/builder/disk_backed_offsets_builder.hpp" +#include "include/builder/streaming_compact_vector_writer.hpp" +#include "include/builder/streaming_save.hpp" #include "include/buckets_statistics.hpp" namespace sshash { +/* + Tmp file paths for the compact_vectors and MPHFs that step 7 spills + to disk. Populated by build_sparse_and_skew_index and injected into + the output by step 8 (stream-save). +*/ +struct spilled_components { + std::string control_codewords_path; + std::string mid_load_buckets_path; + std::string heavy_load_buckets_path; + std::vector skew_positions_paths; // one entry per skew partition + std::string codewords_mphf_path; // step-4 minimizers MPHF + std::vector skew_mphfs_paths; // one entry per skew partition + + void clear_files() { + if (!control_codewords_path.empty()) std::remove(control_codewords_path.c_str()); + if (!mid_load_buckets_path.empty()) std::remove(mid_load_buckets_path.c_str()); + if (!heavy_load_buckets_path.empty()) std::remove(heavy_load_buckets_path.c_str()); + if (!codewords_mphf_path.empty()) std::remove(codewords_mphf_path.c_str()); + for (auto const& p : skew_positions_paths) { + if (!p.empty()) std::remove(p.c_str()); + } + for (auto const& p : skew_mphfs_paths) { + if (!p.empty()) std::remove(p.c_str()); + } + } +}; + template struct dictionary_builder // { dictionary_builder(build_configuration const& build_config) - : build_config(build_config), num_kmers(0), minimizers(build_config), total_time_musec(0) {} + : build_config(build_config) + , num_kmers(0) + , minimizers(build_config) + , strings_run_id(pthash::clock_type::now().time_since_epoch().count()) + , total_time_musec(0) {} + + ~dictionary_builder() { + strings_builder.remove_file(); + strings_offsets_builder.remove_file(); + spilled.clear_files(); + } - void build(dictionary& d, std::string const& filename) // + /* + Build the dictionary and stream-save it to `output_filename` + without ever materializing the spilled components or `strings` + in RAM. After this returns, `d` is *not* query-ready; reload the + saved file via `essentials::load` / `essentials::mmap` to query. + */ + void build(dictionary& d, // + std::string const& filename, // + std::string const& output_filename) // { + run_steps_1_through_7(d, filename); + do_step("step 8 (stream-save dictionary to disk)", [&]() { + /* Address+type-keyed substitution map. The saver replaces the + visit byte content of any registered (address, type) pair + with the bytes of the corresponding tmp file. Type matching + disambiguates the case where a struct's address coincides + with the address of its first member. */ + std::unordered_map subs; + if (!spilled.control_codewords_path.empty()) { + register_sub(subs, &d.m_ssi.codewords.control_codewords, + spilled.control_codewords_path); + } + if (!spilled.mid_load_buckets_path.empty()) { + register_sub(subs, &d.m_ssi.mid_load_buckets, spilled.mid_load_buckets_path); + } + if (!spilled.heavy_load_buckets_path.empty()) { + register_sub(subs, &d.m_ssi.ski.heavy_load_buckets, + spilled.heavy_load_buckets_path); + } + if (!spilled.codewords_mphf_path.empty()) { + register_sub(subs, &d.m_ssi.codewords.mphf, spilled.codewords_mphf_path); + } + /* Skew positions / mphfs: populate the owning_spans with + placeholders so the visit walks the right number of entries + and we can take their addresses for substitution. */ + const std::size_t num_part = + std::max(spilled.skew_positions_paths.size(), spilled.skew_mphfs_paths.size()); + if (num_part > 0) { + std::vector position_placeholders(num_part); + std::vector> mphf_placeholders(num_part); + d.m_ssi.ski.positions = std::move(position_placeholders); + d.m_ssi.ski.mphfs = std::move(mphf_placeholders); + for (std::size_t i = 0; i != spilled.skew_positions_paths.size(); ++i) { + if (!spilled.skew_positions_paths[i].empty()) { + register_sub(subs, &d.m_ssi.ski.positions[i], + spilled.skew_positions_paths[i]); + } + } + for (std::size_t i = 0; i != spilled.skew_mphfs_paths.size(); ++i) { + if (!spilled.skew_mphfs_paths[i].empty()) { + register_sub(subs, &d.m_ssi.ski.mphfs[i], spilled.skew_mphfs_paths[i]); + } + } + } + save_streaming(d, output_filename.c_str(), &d.m_spss.strings, strings_builder, + std::move(subs)); + strings_builder.remove_file(); + spilled.clear_files(); + }); + finalize_stats(d, output_filename); + } + + build_configuration build_config; + uint64_t num_kmers; + minimizers_tuples minimizers; + disk_backed_offsets_builder strings_offsets_builder; + disk_backed_strings strings_builder; + weights::builder weights_builder; + spilled_components spilled; + + uint64_t strings_run_id; + + essentials::timer_type timer; + essentials::json_lines build_stats; + uint64_t total_time_musec; + +private: + void run_steps_1_through_7(dictionary& d, std::string const& filename) { d.m_k = build_config.k; d.m_m = build_config.m; d.m_spss.k = build_config.k; @@ -32,8 +153,21 @@ struct dictionary_builder // total_time_musec = 0; + { + std::stringstream ss_strings; + ss_strings << build_config.tmp_dirname << "/sshash.tmp.run_" << strings_run_id + << ".strings.bin"; + strings_builder.open_for_writing(ss_strings.str()); + std::stringstream ss_offsets; + ss_offsets << build_config.tmp_dirname << "/sshash.tmp.run_" << strings_run_id + << ".strings_offsets.bin"; + strings_offsets_builder.open_for_writing(ss_offsets.str()); + } + do_step("step 1 (encode strings)", [&]() { encode_strings(filename); + strings_builder.freeze(); + strings_offsets_builder.freeze(); d.m_num_kmers = num_kmers; assert(strings_offsets_builder.size() >= 2); d.m_num_strings = strings_offsets_builder.size() - 1; @@ -65,36 +199,48 @@ struct dictionary_builder // minimizers.remove_tmp_file(); assert(strings_offsets_builder.size() == 0); }); + } + + void finalize_stats(dictionary& d, std::string const& saved_path) { + /* `d`'s spilled components are empty placeholders post stream-save, + so read the on-disk index file's size via std::filesystem::file_size + rather than recomputing from `d`. */ + uint64_t num_bytes = 0; + std::error_code ec; + const auto sz = std::filesystem::file_size(saved_path, ec); + if (!ec) num_bytes = static_cast(sz); if (build_config.verbose) { print_time(total_time_musec, "total time"); - d.print_space_breakdown(); + if (num_bytes > 0) { + std::cout << "total index size: " << num_bytes << " [B] -- " + << essentials::convert(num_bytes, essentials::MB) << " [MB]\n"; + std::cout << " total: " << (num_kmers > 0 ? (8.0 * num_bytes) / num_kmers : 0.0) + << " [bits/kmer]" << std::endl; + } } - build_stats.add("total_build_time_in_microsec", total_time_musec); - build_stats.add("index_size_in_bytes", (d.num_bits() + 7) / 8); + build_stats.add("total_build_time", musec_as_seconds_str(total_time_musec).c_str()); + build_stats.add("index_size_in_bytes", num_bytes); build_stats.add("num_kmers", d.num_kmers()); if (build_config.verbose) build_stats.print(); } - build_configuration build_config; - uint64_t num_kmers; - minimizers_tuples minimizers; - typename Offsets::builder strings_offsets_builder; - bits::bit_vector::builder strings_builder; - weights::builder weights_builder; - - essentials::timer_type timer; - essentials::json_lines build_stats; - uint64_t total_time_musec; - -private: void print_time(double time_in_musec, std::string const& message) { std::cout << "=== " << message << ": " << time_in_musec / 1'000'000 << " [sec] (" << (time_in_musec * 1000) / num_kmers << " [ns/kmer])" << std::endl; } + /* Format a microsecond count as e.g. "7.292 [sec]" for the JSON-line + build_stats output. Three decimals = millisecond precision, which is + both compact and plenty precise for build-step durations. */ + static std::string musec_as_seconds_str(uint64_t musec) { + char buf[64]; + std::snprintf(buf, sizeof(buf), "%.3f [sec]", static_cast(musec) / 1.0e6); + return std::string(buf); + } + template void do_step(std::string const& step, Callback const& f) { timer.start(); @@ -103,7 +249,7 @@ struct dictionary_builder // uint64_t step_elapsed_time_musec = timer.elapsed(); total_time_musec += step_elapsed_time_musec; if (build_config.verbose) print_time(step_elapsed_time_musec, step); - build_stats.add(step, step_elapsed_time_musec); + build_stats.add(step, musec_as_seconds_str(step_elapsed_time_musec).c_str()); timer.reset(); } @@ -114,11 +260,12 @@ struct dictionary_builder // void build_mphf(dictionary& d) { const uint64_t num_minimizers = minimizers.num_minimizers(); - mm::file_source input(minimizers.get_minimizers_filename(), - mm::advice::sequential); - minimizers_tuples_iterator iterator(input.data(), input.data() + input.size()); + /* Stream minimizers from disk via std::ifstream (no mmap); the + iterator yields each distinct minimizer once. */ + streaming_minimizers_iterator iterator; + iterator.open(minimizers.get_minimizers_filename()); d.m_ssi.codewords.build(iterator, num_minimizers, build_config); - input.close(); + iterator.close(); assert(d.m_ssi.codewords.size() == num_minimizers); } @@ -126,7 +273,8 @@ struct dictionary_builder // std::string filename = minimizers.get_minimizers_filename(); std::ifstream input(filename, std::ifstream::binary); - auto const& f = d.m_ssi.codewords.mphf; + auto& f_mut = d.m_ssi.codewords.mphf; + auto const& f = f_mut; const uint64_t num_threads = build_config.num_threads; const uint64_t num_files_to_merge = minimizers.num_files_to_merge(); @@ -134,8 +282,10 @@ struct dictionary_builder // uint64_t RAM_available_in_bytes = essentials::GiB / 2; // at least 0.5 GB { - const uint64_t RAM_taken_in_bytes = (f.num_bits() + strings_builder.num_bits()) / 8 + - strings_offsets_builder.num_bytes(); + /* `strings_builder` is now disk-backed; its in-RAM footprint is + bounded by its window size, not by the strings size. */ + const uint64_t RAM_taken_in_bytes = + f.num_bits() / 8 + strings_offsets_builder.num_bytes(); const uint64_t RAM_limit_in_bytes = build_config.ram_limit_in_GiB * essentials::GiB; if (RAM_limit_in_bytes > RAM_taken_in_bytes) { RAM_available_in_bytes = std::max(RAM_limit_in_bytes - RAM_taken_in_bytes, @@ -144,9 +294,17 @@ struct dictionary_builder // } const uint64_t num_super_kmers = minimizers.num_super_kmers(); - const uint64_t buffer_size = num_files_to_merge == 1 - ? num_super_kmers - : (RAM_available_in_bytes / (3 * sizeof(minimizer_tuple))); + /* Cap the in-RAM buffer at ram_limit/8 worth of tuples so that + even when subsequent steps fragment the heap, step 5's lingering + pages don't blow past the budget when stacked with later step's + allocations. */ + const uint64_t buffer_cap_bytes = (build_config.ram_limit_in_GiB * essentials::GiB) / 8; + const uint64_t buffer_cap_records = + std::max(uint64_t(1) << 16, buffer_cap_bytes / sizeof(minimizer_tuple)); + const uint64_t buffer_size_unbounded = + num_files_to_merge == 1 ? num_super_kmers + : (RAM_available_in_bytes / (3 * sizeof(minimizer_tuple))); + const uint64_t buffer_size = std::min(buffer_size_unbounded, buffer_cap_records); const uint64_t num_blocks = (num_super_kmers + buffer_size - 1) / buffer_size; assert(num_super_kmers > (num_blocks - 1) * buffer_size); @@ -179,6 +337,19 @@ struct dictionary_builder // } input.close(); + + /* The codewords MPHF is no longer needed during build (step 6 onward + reads minimizer values that step 5 has already replaced with + mphf hashes; step 7 references mphf hashes only as bucket ids). + Spill it to disk and free its in-RAM footprint. */ + { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << strings_run_id + << ".codewords_mphf.bin"; + spilled.codewords_mphf_path = ss.str(); + essentials::save(f_mut, spilled.codewords_mphf_path.c_str()); + f_mut = minimizers_pthash_type{}; + } } }; diff --git a/include/builder/disk_backed_offsets_builder.hpp b/include/builder/disk_backed_offsets_builder.hpp new file mode 100644 index 0000000..ff1a24c --- /dev/null +++ b/include/builder/disk_backed_offsets_builder.hpp @@ -0,0 +1,278 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "include/builder/buffered_record_stream.hpp" +#include "include/offsets.hpp" + +namespace sshash { + +/* + A disk-backed drop-in for `Offsets::builder` that spills offset values + to a tmp file as they're appended, keeping only a small in-RAM write + buffer. RAM usage of the builder is bounded by the buffer size, + independently of the number of strings. + + Interface mirrors `decoded_offsets::builder` / `encoded_offsets::builder` + enough for the SSHash build path: + + reserve(n) no-op (kept for source compat) + push_back(val) append to file (buffered) + front() / back() / size() O(1), tracked separately + num_bytes() in-RAM footprint of the builder + set_num_bits(nb) stash bit-width metadata + num_bits_per_offset() dispatched per Offsets type + encode(offset, begin, sid) pure, dispatched per Offsets type + build(target) stream from disk into the final + compact / endpoints sequence + remove_file() cleanup + + Random-access `operator[]` is *not* supported. Callers that need to + walk a contiguous range of offsets must use a `reader`, which provides + forward-sequential reads via a small in-RAM buffer. +*/ +template +struct disk_backed_offsets_builder { + static_assert(std::is_same_v || + std::is_same_v, + "disk_backed_offsets_builder supports decoded_offsets and encoded_offsets"); + + static constexpr uint64_t default_writer_buffer_records = uint64_t(1) << 12; // 32 KiB + static constexpr uint64_t default_reader_buffer_records = uint64_t(1) << 12; // 32 KiB + + disk_backed_offsets_builder() = default; + disk_backed_offsets_builder(disk_backed_offsets_builder const&) = delete; + disk_backed_offsets_builder& operator=(disk_backed_offsets_builder const&) = delete; + + void open_for_writing(std::string const& filename, + uint64_t writer_buffer_records = default_writer_buffer_records) { + m_filename = filename; + m_writer_buffer_capacity = std::max(1, writer_buffer_records); + m_buf.clear(); + m_buf.reserve(m_writer_buffer_capacity); + m_size = 0; + m_front = 0; + m_back = 0; + m_have_front = false; + m_frozen = false; + m_writer.open(m_filename, std::ofstream::binary | std::ofstream::trunc); + if (!m_writer.is_open()) { + throw std::runtime_error("cannot open offsets tmp file '" + m_filename + "'"); + } + } + + /* No-op: kept for source-compatibility with the in-RAM builder. */ + void reserve(uint64_t /*n*/) {} + + void push_back(uint64_t val) { + if (!m_have_front) { + m_front = val; + m_have_front = true; + } + m_back = val; + m_buf.push_back(val); + ++m_size; + if (m_buf.size() >= m_writer_buffer_capacity) flush_buffer(); + } + + /* Finish writing: flush the in-RAM buffer and close the writer. */ + void freeze() { + if (m_frozen) return; + flush_buffer(); + if (m_writer.is_open()) m_writer.close(); + m_frozen = true; + } + + uint64_t size() const { return m_size; } + uint64_t front() const { return m_front; } + uint64_t back() const { return m_back; } + std::string const& filename() const { return m_filename; } + + /* In-RAM footprint of the builder (excluding the on-disk file). */ + uint64_t num_bytes() const { return sizeof(m_nb) + m_buf.capacity() * sizeof(uint64_t); } + + void set_num_bits(num_bits nb) { m_nb = nb; } + + uint64_t num_bits_per_offset() const { + if constexpr (std::is_same_v) { + return m_nb.per_absolute_offset; + } else { + return m_nb.per_string_id + m_nb.per_relative_offset; + } + } + + /* Pure: matches `decoded_offsets::builder::encode` / + `encoded_offsets::builder::encode`. Safe to call concurrently from + multiple threads (depends only on m_nb and m_size, both of which + are stable while compute_minimizer_tuples runs). */ + uint64_t encode(uint64_t offset, uint64_t begin, uint64_t string_id) const { + if constexpr (std::is_same_v) { + (void)begin; + (void)string_id; + return offset; + } else { + assert(string_id < m_size); + assert(offset >= begin); + assert((offset - begin) < (uint64_t(1) << m_nb.per_relative_offset)); + uint64_t relative_offset = offset - begin; + return (string_id << m_nb.per_relative_offset) + relative_offset; + } + } + + /* + Forward-sequential reader over the offsets file. Each thread in + compute_minimizer_tuples should construct one for its assigned + index range; per-thread RAM footprint is the buffer size only. + Built on top of `buffered_record_stream`. + */ + struct reader { + reader() = default; + + /* Open the file and seek so that the next `next()` call returns + `*(values + start_index)`. */ + void open(std::string const& filename, uint64_t start_index, + uint64_t buffer_records = default_reader_buffer_records) { + m_stream.open(filename, buffer_records, + static_cast(start_index * sizeof(uint64_t))); + } + + void close() { m_stream.close(); } + + /* Return the next offset and advance. Caller must ensure they + don't read past the end of the file. */ + uint64_t next() { + if (m_stream.empty()) { + throw std::runtime_error("disk_backed_offsets_builder: read past end of file"); + } + const uint64_t v = m_stream.current(); + m_stream.advance(); + return v; + } + + private: + buffered_record_stream m_stream; + }; + + /* Construct a reader positioned at `start_index`. Requires freeze(). */ + reader make_reader(uint64_t start_index, + uint64_t buffer_records = default_reader_buffer_records) const { + if (!m_frozen) { + throw std::runtime_error( + "disk_backed_offsets_builder: must freeze() before make_reader()"); + } + reader r; + r.open(m_filename, start_index, buffer_records); + return r; + } + + /* + A copyable forward iterator over the entire offsets file, suitable + for the `Iterator`-template `encode` / `build` calls in + `bits::endpoints_sequence` and `bits::compact_vector`. Wraps a + shared_ptr> so the iterator is + copyable; copies share the underlying stream state, which is what + those APIs expect. + */ + struct full_iterator { + using iterator_category = std::forward_iterator_tag; + using value_type = uint64_t; + using difference_type = std::ptrdiff_t; + using reference = uint64_t const&; + using pointer = uint64_t const*; + + full_iterator() = default; + + void open(std::string const& filename, + uint64_t buffer_records = default_reader_buffer_records) { + m_stream = std::make_shared>(); + m_stream->open(filename, buffer_records); + } + + uint64_t operator*() const { + assert(m_stream); + return m_stream->current(); + } + full_iterator& operator++() { + assert(m_stream); + m_stream->advance(); + return *this; + } + + private: + std::shared_ptr> m_stream; + }; + + /* + Stream the offsets file into the target Offsets structure (mirrors + the in-RAM builder's `build`). After return, the file is removed + and `size()` resets to 0 to match the in-RAM builder, which clears + its m_v in `build`. + */ + void build(Offsets& target) { + if (!m_frozen) freeze(); + if (m_size == 0) { + remove_file(); + reset_state(); + return; + } + + if constexpr (std::is_same_v) { + full_iterator it; + it.open(m_filename); + target.m_seq.encode(it, m_size, m_back); + } else { + full_iterator it; + it.open(m_filename); + target.m_seq.build(it, m_size, m_nb.per_absolute_offset); + target.m_num_bits_per_relative_offset = m_nb.per_relative_offset; + } + + remove_file(); + reset_state(); + } + + /* Remove the on-disk tmp file (if any). */ + void remove_file() { + if (m_writer.is_open()) m_writer.close(); + if (!m_filename.empty()) std::remove(m_filename.c_str()); + } + +private: + std::string m_filename; + std::ofstream m_writer; + std::vector m_buf; + uint64_t m_writer_buffer_capacity = default_writer_buffer_records; + uint64_t m_size = 0; + uint64_t m_front = 0; + uint64_t m_back = 0; + bool m_have_front = false; + bool m_frozen = false; + num_bits m_nb; + + void flush_buffer() { + if (m_buf.empty()) return; + m_writer.write(reinterpret_cast(m_buf.data()), + static_cast(m_buf.size() * sizeof(uint64_t))); + m_buf.clear(); + } + + void reset_state() { + m_size = 0; + m_buf.clear(); + m_buf.shrink_to_fit(); + m_have_front = false; + m_front = 0; + m_back = 0; + m_frozen = false; + } +}; + +} // namespace sshash diff --git a/include/builder/disk_backed_strings.hpp b/include/builder/disk_backed_strings.hpp new file mode 100644 index 0000000..d267d4a --- /dev/null +++ b/include/builder/disk_backed_strings.hpp @@ -0,0 +1,327 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "external/pthash/external/bits/include/bit_vector.hpp" + +namespace sshash { + +/* + Disk-backed storage for the SSHash `strings` bit-vector. + + During step 1 (encode_strings) bits are appended via `append_bits`. + Internally only the trailing words are kept in RAM (a small "write + window"); completed words are flushed to a tmp file. RAM usage of the + writer is bounded by the window size, independently of the total + bit-vector size. + + After `freeze()`, callers create one or more `reader`s. Each reader owns + an ifstream and a small in-RAM read window, and supports + forward-monotonic `get_word64(bit_pos)` reads. The reader matches the + interface that `kmer_iterator` expects. + + For the standard dictionary save path, `load_into(bits::bit_vector&)` + materializes the full bit-vector in RAM (peaks briefly at strings size). +*/ +struct disk_backed_strings { + static constexpr uint64_t default_writer_buffer_words = uint64_t(1) << 16; // 512 KiB + static constexpr uint64_t default_reader_window_words = uint64_t(1) << 16; // 512 KiB + + disk_backed_strings() + : m_num_bits(0) + , m_writer_buffer_words(default_writer_buffer_words) + , m_words_on_disk(0) + , m_frozen(false) {} + + disk_backed_strings(disk_backed_strings const&) = delete; + disk_backed_strings& operator=(disk_backed_strings const&) = delete; + + /* Open `filename` for writing; truncates any existing contents. */ + void open_for_writing(std::string const& filename, + uint64_t writer_buffer_words = default_writer_buffer_words) { + m_filename = filename; + m_writer_buffer_words = std::max(2, writer_buffer_words); + m_num_bits = 0; + m_words_on_disk = 0; + m_frozen = false; + m_buf.clear(); + m_buf.reserve(m_writer_buffer_words); + m_writer.open(m_filename, std::ofstream::binary | std::ofstream::trunc); + if (!m_writer.is_open()) { + throw std::runtime_error("cannot open strings tmp file '" + m_filename + "'"); + } + } + + /* No-op: kept for source-compatibility with bits::bit_vector::builder. */ + void reserve(uint64_t /*num_bits*/) {} + + /* Append `len` bits (`len` <= 64) from `bits`. Same semantics as + bits::bit_vector::builder::append_bits. */ + void append_bits(uint64_t bits, uint64_t len) { + assert(len <= 64); + assert(len == 64 || (bits >> len) == 0); + if (!len) return; + const uint64_t pos_in_word = m_num_bits & 63; + m_num_bits += len; + if (pos_in_word == 0) { + m_buf.push_back(bits); + } else { + m_buf.back() |= bits << pos_in_word; + if (len > 64 - pos_in_word) m_buf.push_back(bits >> (64 - pos_in_word)); + } + if (m_buf.size() > m_writer_buffer_words) flush_completed_words(); + } + + /* Flush any remaining buffered words and close the writer. After this, + the file is ready for `make_reader()` and `load_into()`. */ + void freeze() { + if (m_frozen) return; + if (!m_buf.empty()) { + m_writer.write(reinterpret_cast(m_buf.data()), + m_buf.size() * sizeof(uint64_t)); + m_words_on_disk += m_buf.size(); + m_buf.clear(); + m_buf.shrink_to_fit(); + } + m_writer.close(); + m_frozen = true; + } + + uint64_t num_bits() const { return m_num_bits; } + std::string const& filename() const { return m_filename; } + bool frozen() const { return m_frozen; } + + /* + Forward-monotonic reader over the strings file. + + `get_word64(bit_pos)` returns the 64-bit word starting at bit + position `bit_pos`. Successive calls must satisfy a forward-monotonic + access pattern in word units (calling code may seek forward via + `at()`-style calls in `kmer_iterator`, but never backward). Reads + past the end-of-file are returned as zero (matches the sentinel + zero-padding the SSHash builder writes at the tail of `strings`). + */ + struct reader { + reader() = default; + reader(reader&& other) noexcept { move_from(std::move(other)); } + reader& operator=(reader&& other) noexcept { + if (this != &other) { + close(); + move_from(std::move(other)); + } + return *this; + } + reader(reader const&) = delete; + reader& operator=(reader const&) = delete; + ~reader() { close(); } + + void open(std::string const& filename, uint64_t num_bits, + uint64_t window_capacity_words = default_reader_window_words) { + m_num_bits = num_bits; + m_total_words = (num_bits + 63) / 64; + m_window_capacity = std::max(2, window_capacity_words); + m_window.assign(m_window_capacity, 0); + m_window_size = 0; + m_window_start_word = 0; + m_in.open(filename, std::ifstream::binary); + if (!m_in.is_open()) { + throw std::runtime_error("cannot open strings tmp file '" + filename + "'"); + } + seek_window_to(0); + } + + bool is_open() const { return m_in.is_open(); } + + void close() { + if (m_in.is_open()) m_in.close(); + m_window.clear(); + m_window.shrink_to_fit(); + m_window_size = 0; + m_window_start_word = 0; + } + + uint64_t num_bits() const { return m_num_bits; } + + uint64_t get_word64(uint64_t bit_pos) const { + const uint64_t block = bit_pos >> 6; + const uint64_t shift = bit_pos & 63; + ensure_window_covers(block); + uint64_t a = + (block >= m_window_start_word && block < m_window_start_word + m_window_size) + ? m_window[block - m_window_start_word] + : uint64_t(0); + uint64_t word = a >> shift; + if (shift) { + const uint64_t next = block + 1; + uint64_t b = + (next >= m_window_start_word && next < m_window_start_word + m_window_size) + ? m_window[next - m_window_start_word] + : uint64_t(0); + word |= b << (64 - shift); + } + return word; + } + + private: + mutable std::ifstream m_in; + uint64_t m_num_bits = 0; + uint64_t m_total_words = 0; + mutable std::vector m_window; + uint64_t m_window_capacity = 0; + mutable uint64_t m_window_size = 0; + mutable uint64_t m_window_start_word = 0; + + void seek_window_to(uint64_t target_word) const { + m_window_start_word = target_word; + if (target_word >= m_total_words) { + m_window_size = 0; + return; + } + m_in.clear(); // clear any prior eof + m_in.seekg(static_cast(target_word * sizeof(uint64_t)), std::ios::beg); + const uint64_t to_read = std::min(m_window_capacity, m_total_words - target_word); + m_in.read(reinterpret_cast(m_window.data()), + static_cast(to_read * sizeof(uint64_t))); + const std::streamsize nread = m_in.gcount(); + m_window_size = static_cast(nread) / sizeof(uint64_t); + } + + void ensure_window_covers(uint64_t block) const { + // We may need both `block` and `block + 1` (for cross-word shifts). + // The window covers [m_window_start_word, m_window_start_word + m_window_size). + const uint64_t need_end = block + 2; // exclusive + if (block >= m_window_start_word && need_end <= m_window_start_word + m_window_size) { + return; + } + // Slide forward (backward seeks are not supported). + seek_window_to(block); + } + + void move_from(reader&& other) { + m_in = std::move(other.m_in); + m_num_bits = other.m_num_bits; + m_total_words = other.m_total_words; + m_window = std::move(other.m_window); + m_window_capacity = other.m_window_capacity; + m_window_size = other.m_window_size; + m_window_start_word = other.m_window_start_word; + other.m_num_bits = 0; + other.m_total_words = 0; + other.m_window_capacity = 0; + other.m_window_size = 0; + other.m_window_start_word = 0; + } + }; + + /* Create a new reader over the frozen file. */ + reader make_reader(uint64_t window_capacity_words = default_reader_window_words) const { + if (!m_frozen) { + throw std::runtime_error("disk_backed_strings: must freeze() before make_reader()"); + } + reader r; + r.open(m_filename, m_num_bits, window_capacity_words); + return r; + } + + /* + Stream the strings to `os` in the same byte format that + `essentials::generic_saver::visit(bits::bit_vector const&)` would + produce — i.e., + uint64_t m_num_bits; + size_t n; // number of 64-bit words + uint64_t m_data[n]; + — without ever materializing the full bit-vector in RAM. The bytes + are read from the tmp file in fixed-size chunks. + + This relies on `bits::bit_vector::visit_impl` writing exactly two + fields (`m_num_bits` and the `m_data` owning_span) and on + `generic_saver::visit_seq` writing `size_t n` followed by the raw + `n * sizeof(uint64_t)` bytes. If `bits::bit_vector` ever changes its + on-disk representation, this method must be updated to match. + */ + void save_to(std::ostream& os) const { + if (!m_frozen) { + throw std::runtime_error("disk_backed_strings: must freeze() before save_to()"); + } + const uint64_t num_bits = m_num_bits; + os.write(reinterpret_cast(&num_bits), sizeof(uint64_t)); + const uint64_t total_words = (num_bits + 63) / 64; + const std::size_t n = static_cast(total_words); + os.write(reinterpret_cast(&n), sizeof(std::size_t)); + if (total_words == 0) return; + std::ifstream in(m_filename, std::ifstream::binary); + if (!in.is_open()) { + throw std::runtime_error("cannot open strings tmp file '" + m_filename + "'"); + } + std::vector buffer(uint64_t(64) << 10); // 64 KiB + uint64_t bytes_remaining = total_words * sizeof(uint64_t); + while (bytes_remaining > 0) { + const std::streamsize chunk = + static_cast(std::min(buffer.size(), bytes_remaining)); + in.read(buffer.data(), chunk); + const std::streamsize got = in.gcount(); + if (got <= 0) { + throw std::runtime_error("unexpected EOF in strings tmp file '" + m_filename + "'"); + } + os.write(buffer.data(), got); + bytes_remaining -= static_cast(got); + } + in.close(); + } + + /* + Materialize the full bit-vector in RAM. This briefly peaks at the + bit-vector size and is used immediately before `essentials::save`. + */ + void load_into(bits::bit_vector& bv) const { + if (!m_frozen) { + throw std::runtime_error("disk_backed_strings: must freeze() before load_into()"); + } + bits::bit_vector::builder b(m_num_bits); + auto& data_vec = b.data(); + const uint64_t total_words = (m_num_bits + 63) / 64; + if (total_words > 0) { + std::ifstream in(m_filename, std::ifstream::binary); + if (!in.is_open()) { + throw std::runtime_error("cannot open strings tmp file '" + m_filename + "'"); + } + in.read(reinterpret_cast(data_vec.data()), + static_cast(total_words * sizeof(uint64_t))); + in.close(); + } + b.build(bv); + } + + /* Delete the on-disk strings file. */ + void remove_file() { + if (!m_filename.empty()) std::remove(m_filename.c_str()); + } + +private: + std::string m_filename; + std::ofstream m_writer; + uint64_t m_num_bits; + std::vector m_buf; + uint64_t m_writer_buffer_words; + uint64_t m_words_on_disk; + bool m_frozen; + + void flush_completed_words() { + if (m_buf.size() < 2) return; + const uint64_t to_flush = m_buf.size() - 1; // keep last (possibly partial) word + m_writer.write(reinterpret_cast(m_buf.data()), + static_cast(to_flush * sizeof(uint64_t))); + m_words_on_disk += to_flush; + m_buf[0] = m_buf.back(); + m_buf.resize(1); + } +}; + +} // namespace sshash diff --git a/include/builder/file_merging_iterator.hpp b/include/builder/file_merging_iterator.hpp index 85b95ac..37cf24c 100644 --- a/include/builder/file_merging_iterator.hpp +++ b/include/builder/file_merging_iterator.hpp @@ -1,42 +1,60 @@ #pragma once -#include -#include -#include #include +#include +#include +#include +#include +#include +#include "buffered_record_stream.hpp" #include "util.hpp" namespace sshash { /* - Winner-tree-based implementation. + Winner-tree-based external-merge iterator over N sorted runs on disk. + + Each run is read with a small buffered std::ifstream (no mmap) so that + process RSS stays bounded by `num_files_to_merge * buffer_records * + sizeof(T)` regardless of total run size. Values are surfaced as + `T const&` from each stream's in-RAM buffer; the merge logic compares + those values directly instead of pointers into mmap'd memory. + + Required of T: + - copy-constructible / move-constructible, + - `static T T::max()` returning a strict upper bound (used as the + sentinel for exhausted streams in the winner tree), + - `bool operator<(T, T)`. */ template struct file_merging_iterator // { + static constexpr uint64_t default_buffer_records = uint64_t(1) << 12; // 4096 records const uint64_t scan_threshold = 16; template - file_merging_iterator(FileNamesIterator file_names_iterator, uint64_t num_files_to_merge) - : m_mm_files(num_files_to_merge) // + file_merging_iterator(FileNamesIterator file_names_iterator, uint64_t num_files_to_merge, + uint64_t buffer_records = default_buffer_records) // { - if (num_files_to_merge == 0) return; + if (num_files_to_merge == 0) { + m_num_files_to_merge = 0; + return; + } - /* open files and create the input iterators */ - m_iterators.reserve(num_files_to_merge); + m_streams.reserve(num_files_to_merge); for (uint64_t i = 0; i != num_files_to_merge; ++i, ++file_names_iterator) { - m_mm_files[i].open(*file_names_iterator, mm::advice::sequential); - m_iterators.push_back( - {m_mm_files[i].data(), m_mm_files[i].data() + m_mm_files[i].size()}); + m_streams.emplace_back(); + m_streams.back().open(*file_names_iterator, buffer_records); } m_num_files_to_merge = num_files_to_merge; m_min_idx = 0; - if (m_iterators.size() <= scan_threshold) { + if (m_streams.size() <= scan_threshold) { compute_min(); } else { - /* build a winner tree */ + /* build a winner tree (same shape as before, but the leaves + index into m_streams instead of carrying raw pointers). */ uint64_t n = num_files_to_merge; uint64_t m = 2 * n - 1; m_size = n; @@ -51,97 +69,82 @@ struct file_merging_iterator // bool has_next() { return m_num_files_to_merge != 0; } void next() { update(); } - T operator*() const { return *(m_iterators[m_min_idx].begin); } + T operator*() const { return m_streams[m_min_idx].current(); } void close() { - for (auto& mm_file : m_mm_files) mm_file.close(); - m_iterators.clear(); - m_mm_files.clear(); + for (auto& s : m_streams) s.close(); + m_streams.clear(); + m_streams.shrink_to_fit(); m_tree.clear(); + m_tree.shrink_to_fit(); } private: - struct pointer_pair { - T const* begin; - T const* end; - }; - std::vector m_iterators; - std::vector> m_mm_files; + /* Each input run is read via a small buffered ifstream. */ + using stream_t = buffered_record_stream; + std::vector m_streams; std::vector m_tree; - uint64_t m_begin, m_size; - uint64_t m_min_idx, m_num_files_to_merge; + uint64_t m_begin = 0, m_size = 0; + uint64_t m_min_idx = 0, m_num_files_to_merge = 0; void update() { - if (m_iterators.size() <= scan_threshold) { // compute min with a linear scan - auto& it = m_iterators[m_min_idx]; - it.begin += 1; - if (it.begin == it.end) { - m_iterators.erase(m_iterators.begin() + m_min_idx); + if (m_streams.size() <= scan_threshold) { + auto& s = m_streams[m_min_idx]; + s.advance(); + if (s.empty()) { + m_streams.erase(m_streams.begin() + m_min_idx); m_min_idx = 0; --m_num_files_to_merge; if (m_num_files_to_merge == 0) return; } compute_min(); - } else { // update the winner tree + } else { // winner tree m_min_idx = m_tree[0]; - assert(m_min_idx < m_iterators.size()); - auto& it = m_iterators[m_min_idx]; - it.begin += 1; + assert(m_min_idx < m_streams.size()); + auto& s = m_streams[m_min_idx]; + s.advance(); uint64_t p = m_begin + m_min_idx; - p -= (p >= m_tree.size()) * m_size; // p is the index of the leaf - if (it.begin == it.end) { + p -= (p >= m_tree.size()) * m_size; // p is the leaf index + if (s.empty()) { m_tree[p] = uint32_t(-1); --m_num_files_to_merge; } const T inf = T::max(); while (p) { uint64_t is_r_child = (p & 1) == 0; - uint32_t i = 0; uint32_t l = m_tree[p - is_r_child]; uint32_t r = m_tree[p + 1 - is_r_child]; - - T const* ptr_l = (l == uint32_t(-1)) ? &inf : m_iterators[l].begin; - T const* ptr_r = (r == uint32_t(-1)) ? &inf : m_iterators[r].begin; - i = (*ptr_l < *ptr_r) ? l : r; - - /* same as this code but the one above uses CMOV */ - // if (l == uint32_t(-1)) { - // i = r; - // } else if (r == uint32_t(-1)) { - // i = l; - // } else { - // i = *(m_iterators[l].begin) < *(m_iterators[r].begin) ? l : r; - // } - + T const& vl = (l == uint32_t(-1)) ? inf : m_streams[l].current(); + T const& vr = (r == uint32_t(-1)) ? inf : m_streams[r].current(); + uint32_t i = (vl < vr) ? l : r; uint64_t parent = (p - 1) / 2; m_tree[parent] = i; p = parent; } m_min_idx = m_tree[0]; } - }; + } uint32_t build(uint32_t p) { if (p >= m_tree.size()) return uint32_t(-1); if (p >= m_size - 1) return m_tree[p]; // leaf uint32_t l = build(2 * p + 1); uint32_t r = build(2 * p + 2); - uint32_t i = 0; const T inf = T::max(); - T const* ptr_l = (l == uint32_t(-1)) ? &inf : m_iterators[l].begin; - T const* ptr_r = (r == uint32_t(-1)) ? &inf : m_iterators[r].begin; - i = (*ptr_l < *ptr_r) ? l : r; + T const& vl = (l == uint32_t(-1)) ? inf : m_streams[l].current(); + T const& vr = (r == uint32_t(-1)) ? inf : m_streams[r].current(); + uint32_t i = (vl < vr) ? l : r; m_tree[p] = i; return i; } void compute_min() { m_min_idx = 0; - auto min_val = *m_iterators.front().begin; - for (uint64_t i = 1; i != m_iterators.size(); ++i) { - assert(m_iterators[i].begin != m_iterators[i].end); - auto val = *m_iterators[i].begin; + T min_val = m_streams.front().current(); + for (uint64_t i = 1; i != m_streams.size(); ++i) { + assert(!m_streams[i].empty()); + T const& val = m_streams[i].current(); if (val < min_val) { min_val = val; m_min_idx = i; diff --git a/include/builder/streaming_compact_vector_writer.hpp b/include/builder/streaming_compact_vector_writer.hpp new file mode 100644 index 0000000..e36e83b --- /dev/null +++ b/include/builder/streaming_compact_vector_writer.hpp @@ -0,0 +1,144 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace sshash { + +/* + Streams a `bits::compact_vector` to disk one entry at a time, accepting + `set(index, value)` calls in monotonically non-decreasing index order + (gaps are filled with zero, matching the default-zero semantics of an + in-RAM compact_vector::builder). + + The on-disk byte layout matches `bits::compact_vector::visit_impl`: + uint64_t m_size + uint64_t m_width + uint64_t m_mask + size_t n (= ceil(m_size * m_width / 64)) + uint64_t m_data[n] (little-endian bit-packed) + + Total RAM footprint: a 2-word rolling window plus the std::ofstream's + own buffer. Independently of the number of entries. +*/ +struct streaming_compact_vector_writer { + streaming_compact_vector_writer() = default; + streaming_compact_vector_writer(streaming_compact_vector_writer const&) = delete; + streaming_compact_vector_writer& operator=(streaming_compact_vector_writer const&) = delete; + + void open(std::string const& filename, uint64_t num_entries, uint64_t width) { + if (width == 0) + throw std::runtime_error("streaming_compact_vector_writer: width must be > 0"); + if (width > 64) + throw std::runtime_error("streaming_compact_vector_writer: width must be <= 64"); + m_filename = filename; + m_num_entries = num_entries; + m_width = width; + /* Match `bits::compact_vector::builder`'s data layout, which + allocates `words_for(size*width) + 1` words: the trailing + padding word allows in-RAM `set_bits` to write across word + boundaries without bounds checking and is part of the + serialized `m_data` owning_span. */ + const uint64_t packed_words = (num_entries == 0) ? 0 : (num_entries * width + 63) / 64; + m_total_words = packed_words + 1; + m_words_written = 0; + m_buf[0] = 0; + m_buf[1] = 0; + m_have_last_index = false; + + m_out.open(filename, std::ofstream::binary | std::ofstream::trunc); + if (!m_out.is_open()) { + throw std::runtime_error("cannot open compact_vector tmp file '" + filename + "'"); + } + + /* Header (matches bits::compact_vector::visit_impl). */ + write_pod(m_num_entries); + write_pod(m_width); + const uint64_t mask = (m_width == 64) ? uint64_t(-1) : ((uint64_t(1) << m_width) - 1); + write_pod(mask); + const std::size_t n = static_cast(m_total_words); + write_pod(n); + } + + /* Write a value at position `index`. Successive calls must satisfy + `index >= previous_index`; gaps are filled with zero. */ + void set(uint64_t index, uint64_t value) { + if (m_have_last_index) { assert(index >= m_last_index); } + m_have_last_index = true; + m_last_index = index; + + const uint64_t bit_offset = index * m_width; + const uint64_t word_index = bit_offset / 64; + const uint64_t bit_in_word = bit_offset % 64; + + /* Slide the 2-word window forward to cover word_index. Words below + are now finalized; emit them. */ + while (m_words_written < word_index) { + write_word(m_buf[0]); + m_buf[0] = m_buf[1]; + m_buf[1] = 0; + ++m_words_written; + } + + /* OR `value` (m_width low bits of it) into the window starting at + bit_in_word of m_buf[0]; overflow goes into m_buf[1]. */ + const uint64_t fits_in_word_0 = 64 - bit_in_word; + if (m_width <= fits_in_word_0) { + if (m_width == 64) { + /* bit_in_word must be 0 here */ + m_buf[0] = value; + } else { + m_buf[0] |= value << bit_in_word; + } + } else { + m_buf[0] |= value << bit_in_word; + m_buf[1] |= value >> fits_in_word_0; + } + } + + /* Flush remaining buffered words and close the file. */ + void finalize() { + while (m_words_written < m_total_words) { + write_word(m_buf[0]); + m_buf[0] = m_buf[1]; + m_buf[1] = 0; + ++m_words_written; + } + if (m_out.is_open()) m_out.close(); + } + + std::string const& filename() const { return m_filename; } + uint64_t num_entries() const { return m_num_entries; } + uint64_t width() const { return m_width; } + + void remove_file() { + if (m_out.is_open()) m_out.close(); + if (!m_filename.empty()) std::remove(m_filename.c_str()); + } + +private: + std::string m_filename; + std::ofstream m_out; + uint64_t m_num_entries = 0; + uint64_t m_width = 0; + uint64_t m_total_words = 0; + uint64_t m_words_written = 0; + uint64_t m_buf[2] = {0, 0}; + uint64_t m_last_index = 0; + bool m_have_last_index = false; + + template + void write_pod(T const& v) { + m_out.write(reinterpret_cast(&v), sizeof(T)); + } + void write_word(uint64_t w) { + m_out.write(reinterpret_cast(&w), sizeof(uint64_t)); + } +}; + +} // namespace sshash diff --git a/include/builder/streaming_save.hpp b/include/builder/streaming_save.hpp new file mode 100644 index 0000000..78db027 --- /dev/null +++ b/include/builder/streaming_save.hpp @@ -0,0 +1,173 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "essentials.hpp" +#include "external/pthash/external/bits/include/bit_vector.hpp" + +#include "include/builder/disk_backed_strings.hpp" + +namespace sshash { + +/* + A typed substitution: the saver replaces the visit byte content of an + object at `address` with the bytes of `filename` only if the visited + type T satisfies `std::type_index(typeid(T)) == type`. + + Type discrimination is necessary because in C++ a struct's address + coincides with the address of its first member when the struct has + standard layout. Without the type check, a substitution registered + for an inner field would also fire (incorrectly) on every enclosing + parent that shares its address. +*/ +struct typed_address_sub { + std::string filename; + std::type_index type; +}; + +/* + A saver that mirrors `essentials::generic_saver`, except for two + interception mechanisms used during streaming save: + + 1. The `bits::bit_vector` instance whose address matches `strings_addr` + has its bytes streamed from `strings_storage` (which writes the + same on-disk format `bits::bit_vector::visit_impl` produces). + + 2. Any object whose address appears in `address_subs` has its visit + byte content replaced by a copy of the corresponding tmp file. + This is type-agnostic — it works for `bits::compact_vector`, for + pthash MPHFs, or anything else whose serialized form has been + saved to a file via `essentials::save`. + + The substitution check is performed at the start of every visit + call (whatever T is); if no match, the call falls through to the + regular `essentials::generic_saver` logic (POD via save_pod, or + recursion via val.visit(*this)). +*/ +struct streaming_strings_saver { + streaming_strings_saver(std::ostream& os, // + bits::bit_vector const* strings_addr, // + disk_backed_strings const* strings_storage, // + std::unordered_map address_subs) + : m_os(os) + , m_strings_addr(strings_addr) + , m_strings_storage(strings_storage) + , m_address_subs(std::move(address_subs)) { + if (m_strings_addr == nullptr || m_strings_storage == nullptr) { + throw std::runtime_error("streaming_strings_saver requires non-null arguments"); + } + } + + template + void visit(T const& val) { + /* Type+address substitution (compact_vectors, MPHFs, etc.). + Both must match: address alone is ambiguous when a struct + shares its address with its first member. */ + void const* addr = static_cast(&val); + auto it = m_address_subs.find(addr); + if (it != m_address_subs.end() && it->second.type == std::type_index(typeid(T))) { + stream_file_into_os(it->second.filename); + return; + } + /* Strings: dedicated callback because the on-disk strings file + holds raw words (not the bits::bit_vector serialized form); + `disk_backed_strings::save_to(os)` writes the visit_impl format + on the fly. */ + if constexpr (std::is_same_v) { + if (&val == m_strings_addr) { + m_strings_storage->save_to(m_os); + return; + } + } + if constexpr (essentials::is_pod::value) { + essentials::save_pod(m_os, val); + } else { + val.visit(*this); + } + } + + template + void visit(std::vector const& vec) { + visit_seq(vec); + } + + template + void visit(essentials::owning_span const& vec) { + visit_seq(vec); + } + + std::size_t bytes() { return static_cast(m_os.tellp()); } + +private: + std::ostream& m_os; + bits::bit_vector const* m_strings_addr; + disk_backed_strings const* m_strings_storage; + std::unordered_map m_address_subs; + + template + void visit_seq(Vec const& vec) { + using T = typename Vec::value_type; + const std::size_t n = vec.size(); + visit(n); + if constexpr (essentials::is_pod::value) { + m_os.write(reinterpret_cast(vec.data()), + static_cast(sizeof(T) * n)); + } else { + for (auto const& v : vec) visit(v); + } + } + + void stream_file_into_os(std::string const& filename) { + std::ifstream in(filename, std::ifstream::binary); + if (!in.is_open()) { + throw std::runtime_error("cannot open spilled component file '" + filename + "'"); + } + char buf[64 * 1024]; + while (in.good()) { + in.read(buf, sizeof(buf)); + const std::streamsize got = in.gcount(); + if (got > 0) m_os.write(buf, got); + } + in.close(); + } +}; + +/* + Save `t` to `filename`. Any embedded bits::bit_vector matching + `strings_addr` is streamed from `strings_storage`; any embedded + object whose address appears in `address_subs` has its bytes copied + from the corresponding tmp file. Other fields are saved via the + standard essentials path. +*/ +template +void save_streaming(T const& t, char const* filename, // + bits::bit_vector const* strings_addr, // + disk_backed_strings const& strings_storage, // + std::unordered_map address_subs = {}) { + std::ofstream out(filename, std::ios::binary); + if (!out.good()) { + throw std::runtime_error(std::string("error opening file '") + filename + "' for writing"); + } + streaming_strings_saver saver(out, strings_addr, &strings_storage, std::move(address_subs)); + saver.visit(t); + out.close(); +} + +/* Helper: register a typed substitution at the address of `addr`. */ +template +inline void register_sub(std::unordered_map& subs, T const* addr, + std::string filename) { + subs.insert_or_assign(static_cast(addr), + typed_address_sub{std::move(filename), std::type_index(typeid(T))}); +} + +} // namespace sshash diff --git a/include/builder/util.hpp b/include/builder/util.hpp index b390436..94761b9 100644 --- a/include/builder/util.hpp +++ b/include/builder/util.hpp @@ -1,8 +1,11 @@ #pragma once -#include #include +#include +#include +#include +#include "buffered_record_stream.hpp" #include "file_merging_iterator.hpp" #include "parallel_sort.hpp" @@ -58,100 +61,92 @@ inline std::ostream& operator<<(std::ostream& os, minimizer_tuple const& mt) { return os; } -struct bucket_type { - bucket_type(minimizer_tuple const* begin, minimizer_tuple const* end) - : m_begin(begin) - , m_end(end) - , m_num_super_kmers(std::distance(begin, end)) - , m_num_minimizer_positions(0) // - { - uint64_t prev_pos_in_seq = constants::invalid_uint64; - while (begin != end) { - uint64_t pos_in_seq = (*begin).pos_in_seq; - if (pos_in_seq != prev_pos_in_seq) { - ++m_num_minimizer_positions; - prev_pos_in_seq = pos_in_seq; - } - ++begin; - } - assert(m_num_minimizer_positions <= m_num_super_kmers); - } - - struct iterator { - iterator(minimizer_tuple const* begin) : m_begin(begin) {} - - inline minimizer_tuple operator*() const { return *m_begin; } - inline void operator++() { ++m_begin; } - bool operator==(iterator const& other) const { return m_begin == other.m_begin; } - bool operator!=(iterator const& other) const { return !(*this == other); } - - private: - minimizer_tuple const* m_begin; - }; +/* + Streaming forward iterator over a sorted minimizers tmp file that + yields each distinct `minimizer` value exactly once (i.e., one value + per bucket), built on top of `buffered_record_stream` + so RAM usage is constant. + + Copyable: pthash's `build_in_external_memory` takes the iterator by + value, so the underlying buffered stream is held via shared_ptr. + Copies share the stream state; pthash's local copy advances the + shared stream, and the original at the call site is unused after the + build returns. +*/ +struct streaming_minimizers_iterator { + using iterator_category = std::forward_iterator_tag; + using value_type = uint64_t; + using difference_type = std::ptrdiff_t; + using reference = uint64_t const&; + using pointer = uint64_t const*; - iterator begin() const { return iterator(m_begin); } - iterator end() const { return iterator(m_end); } + streaming_minimizers_iterator() = default; - /* - When a canonical index is built (option `--canonical`), - a minimizer offset can correspond to more than one super-kmer. - A super-kmer is uniquely identified by the couple - (minimizer offset, position of minimizer in the first kmer of the super-kmer). - These two components, together, give the - starting position of a super-kmer in the sequence. + void open(std::string const& filename) { + m_stream = std::make_shared>(); + m_stream->open(filename); + m_current = m_stream->empty() ? uint64_t(-1) : m_stream->current().minimizer; + } - So the method size() returns the number of minimizer - positions which is <= the number of superkmers. - */ - uint64_t num_super_kmers() const { return m_num_super_kmers; } - uint64_t size() const { return m_num_minimizer_positions; } + void close() { + if (m_stream) m_stream->close(); + m_stream.reset(); + } - minimizer_tuple const* begin_ptr() const { return m_begin; } - minimizer_tuple const* end_ptr() const { return m_end; } + uint64_t operator*() const { return m_current; } + streaming_minimizers_iterator& operator++() { + advance_to_next_minimizer(); + return *this; + } private: - minimizer_tuple const* m_begin; - minimizer_tuple const* m_end; - uint64_t m_num_super_kmers; - uint64_t m_num_minimizer_positions; + std::shared_ptr> m_stream; + uint64_t m_current = uint64_t(-1); + + void advance_to_next_minimizer() { + const uint64_t prev = m_current; + while (!m_stream->empty()) { + m_stream->advance(); + if (m_stream->empty()) return; // m_current holds last value + if (m_stream->current().minimizer != prev) { + m_current = m_stream->current().minimizer; + return; + } + } + } }; /* - Iterate over the "bucket" of a minimizer, i.e., - the sorted list of minimizer tuples - (minimizer, pos_in_seq, pos_in_kmer, num_kmers_in_superkmer). -*/ -struct minimizers_tuples_iterator { - typedef minimizer_tuple value_type; - using iterator_category = std::forward_iterator_tag; - - minimizers_tuples_iterator(minimizer_tuple const* begin, minimizer_tuple const* end) - : m_bucket_begin(begin), m_bucket_end(begin), m_end(end) { - m_bucket_end = next_begin(); - } + Streaming reader over a minimizers tmp file. Reads minimizer_tuple + records via std::ifstream (no mmap), and groups consecutive tuples by + minimizer into "buckets" with bounded RAM (~ one bucket at a time plus + one record of lookahead). - inline uint64_t minimizer() const { return (*m_bucket_begin).minimizer; } - inline uint64_t operator*() const { return minimizer(); } - inline void next() { - m_bucket_begin = m_bucket_end; - m_bucket_end = next_begin(); + The caller passes a vector to receive the bucket's tuples; for typical + inputs this peaks at max_bucket_size * sizeof(minimizer_tuple). +*/ +struct streaming_minimizer_bucket_reader { + void open(std::string const& filename) { m_stream.open(filename); } + + void close() { m_stream.close(); } + + bool has_next_bucket() const { return !m_stream.empty(); } + + /* Read the next bucket into `bucket_out` (cleared first). All tuples in + a bucket share the same minimizer. Returns the bucket's minimizer. */ + uint64_t next_bucket(std::vector& bucket_out) { + bucket_out.clear(); + assert(has_next_bucket()); + const uint64_t mm = m_stream.current().minimizer; + do { + bucket_out.push_back(m_stream.current()); + m_stream.advance(); + } while (!m_stream.empty() && m_stream.current().minimizer == mm); + return mm; } - inline void operator++() { next(); } - bool has_next() const { return m_bucket_begin != m_end; } - bucket_type bucket() const { return bucket_type(m_bucket_begin, m_bucket_end); } private: - minimizer_tuple const* m_bucket_begin; - minimizer_tuple const* m_bucket_end; - minimizer_tuple const* m_end; - - minimizer_tuple const* next_begin() { - if (m_bucket_begin == m_end) return m_end; - minimizer_tuple const* begin = m_bucket_begin; - uint64_t prev_minimizer = begin->minimizer; - while (++begin != m_end and begin->minimizer == prev_minimizer) {} - return begin; - } + buffered_record_stream m_stream; }; struct minimizers_tuples { @@ -218,17 +213,28 @@ struct minimizers_tuples { assert(m_num_minimizers == 0); assert(m_num_minimizer_positions == 0); assert(m_num_super_kmers == 0); - mm::file_source input(get_minimizers_filename(), - mm::advice::sequential); - for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); - it.has_next(); it.next()) // - { - auto bucket = it.bucket(); + + /* Single-pass count via streaming ifstream (no mmap). */ + streaming_minimizer_bucket_reader reader; + reader.open(get_minimizers_filename()); + std::vector bucket_buf; + while (reader.has_next_bucket()) { + reader.next_bucket(bucket_buf); + uint64_t bucket_size = 0; + { + uint64_t prev = constants::invalid_uint64; + for (auto const& mt : bucket_buf) { + if (mt.pos_in_seq != prev) { + ++bucket_size; + prev = mt.pos_in_seq; + } + } + } m_num_minimizers += 1; - m_num_minimizer_positions += bucket.size(); - m_num_super_kmers += bucket.num_super_kmers(); + m_num_minimizer_positions += bucket_size; + m_num_super_kmers += bucket_buf.size(); } - input.close(); + reader.close(); return; } diff --git a/include/constants.hpp b/include/constants.hpp index b252f6e..b884ca3 100644 --- a/include/constants.hpp +++ b/include/constants.hpp @@ -4,6 +4,7 @@ namespace sshash::constants { constexpr uint64_t invalid_uint64 = uint64_t(-1); constexpr uint64_t default_ram_limit_in_GiB = 8; +constexpr uint64_t min_ram_limit_in_GiB = 4; constexpr uint64_t seed = 1; /* for PTHash */ diff --git a/include/dictionary.hpp b/include/dictionary.hpp index a30b8c4..e553b6e 100644 --- a/include/dictionary.hpp +++ b/include/dictionary.hpp @@ -25,8 +25,16 @@ struct dictionary // , m_m(0) , m_canonical(false) {} - /* Build from input file. */ - void build(std::string const& input_filename, build_configuration const& build_config); + /* + Build from input file, streaming the resulting dictionary to + `output_filename` as it goes. The strings bit-vector and the + sparse/skew components are never fully materialized in RAM during + construction, so peak RAM is bounded by the build phase only. + After this returns, `*this` is *not* query-ready; load the saved + index back via `essentials::load` / `essentials::mmap` to query. + */ + void build(std::string const& input_filename, build_configuration const& build_config, + std::string const& output_filename); essentials::version_number vnum() const { return m_vnum; } uint64_t num_kmers() const { return m_num_kmers; } diff --git a/include/minimizers_control_map.hpp b/include/minimizers_control_map.hpp index 834d607..5665cf9 100644 --- a/include/minimizers_control_map.hpp +++ b/include/minimizers_control_map.hpp @@ -13,8 +13,9 @@ struct minimizers_control_map // mphf_build_config.alpha = 0.94; mphf_build_config.seed = util::get_seed_for_hash_function(build_config); mphf_build_config.verbose = false; - mphf_build_config.num_threads = build_config.num_threads; - mphf_build_config.avg_partition_size = constants::avg_partition_size; + util::configure_mphf_threads_and_partition(mphf_build_config, build_config.num_threads, + build_config.ram_limit_in_GiB, + build_config.verbose, "minimizers MPHF"); mphf_build_config.ram = (build_config.ram_limit_in_GiB * essentials::GiB) / 2; mphf_build_config.tmp_dir = build_config.tmp_dirname; diff --git a/include/offsets.hpp b/include/offsets.hpp index e718ed3..6307471 100644 --- a/include/offsets.hpp +++ b/include/offsets.hpp @@ -5,6 +5,9 @@ namespace sshash { +template +struct disk_backed_offsets_builder; + struct num_bits { num_bits() : per_absolute_offset(0), per_relative_offset(0), per_string_id(0) {} uint64_t per_absolute_offset; @@ -101,6 +104,12 @@ struct offsets // visit_impl(visitor, *this); } + /* Allow disk_backed_offsets_builder to populate m_seq directly via a + streaming forward iterator (mirroring what `Seq`'s nested builder + does, but with on-disk values). */ + template + friend struct disk_backed_offsets_builder; + protected: Seq m_seq; uint64_t m_num_bits_per_relative_offset; diff --git a/include/util.hpp b/include/util.hpp index bf9bebd..c29e748 100644 --- a/include/util.hpp +++ b/include/util.hpp @@ -199,6 +199,67 @@ static inline uint64_t get_seed_for_hash_function(build_configuration const& bui return build_config.seed != my_favourite_seed ? my_favourite_seed : ~my_favourite_seed; } +/* + Configure pthash's `num_threads` and `avg_partition_size` so that the + parallel sub-partition build memory fits in the user's --ram-limit + without unilaterally reducing the user's requested thread count. + + pthash's `partitioned_phf::build` builds the partitioned MPHF's + sub-partitions in parallel; each sub-partition allocates a `pairs_t` + of roughly `avg_partition_size * sizeof(pair)` bytes during the + `map` + sort step. So per-thread peak ≈ `avg_partition_size * + per_key_bytes`. We can scale `avg_partition_size` down to fit any + desired per-thread budget — the only floor is pthash's hash-search + quality, for which `avg_partition_size` should not go below ~100k. + + Strategy: split half of `--ram-limit` evenly across the requested + threads (the other half covers sshash's own buffers). For each + thread compute `per_thread_budget`, derive a candidate + `avg_partition_size`, and use it (clamped at the default upper end + so we never make partitions larger than usual). Only when the + derived `avg_partition_size` falls below the floor do we fall back + to capping threads — in that case we emit a warning naming the MPHF + so the user knows the requested -t couldn't be honored. +*/ +static inline void configure_mphf_threads_and_partition(pthash::build_configuration& mphf, // + uint64_t requested_num_threads, // + uint64_t ram_limit_in_GiB, // + bool verbose, // + char const* mphf_name) // +{ + constexpr uint64_t per_key_bytes = 32; // pairs_t entry + sort slack + constexpr uint64_t min_avg_partition_size = uint64_t(100) * 1000; + const uint64_t default_avg = constants::avg_partition_size; + + const uint64_t pthash_ram = (ram_limit_in_GiB * essentials::GiB) / 2; + const uint64_t per_thread = pthash_ram / std::max(1, requested_num_threads); + const uint64_t avg_for_thread_budget = per_thread / per_key_bytes; + + if (avg_for_thread_budget >= default_avg) { + /* Plenty of RAM per thread — keep the default partition size. */ + mphf.num_threads = requested_num_threads; + mphf.avg_partition_size = default_avg; + } else if (avg_for_thread_budget >= min_avg_partition_size) { + /* Tighter per-thread budget: shrink partitions to fit; threads + honored. */ + mphf.num_threads = requested_num_threads; + mphf.avg_partition_size = avg_for_thread_budget; + } else { + /* Pathological: not enough RAM per thread even at the floor. + Cap threads so the floor fits. */ + const uint64_t max_threads = + std::max(1, pthash_ram / (per_key_bytes * min_avg_partition_size)); + if (verbose) { + std::cout << " --> WARNING: not enough RAM per thread for " << mphf_name + << " (--ram-limit=" << ram_limit_in_GiB << " GiB, " << requested_num_threads + << " requested threads): capping to " << max_threads + << " threads at min partition size " << min_avg_partition_size << std::endl; + } + mphf.num_threads = max_threads; + mphf.avg_partition_size = min_avg_partition_size; + } +} + [[maybe_unused]] static bool ends_with(std::string const& str, std::string const& pattern) { if (pattern.size() > str.size()) return false; return std::equal(pattern.begin(), pattern.end(), str.end() - pattern.size()); diff --git a/src/builder/build.cpp b/src/builder/build.cpp index e9eed1d..e4ee59a 100644 --- a/src/builder/build.cpp +++ b/src/builder/build.cpp @@ -6,25 +6,45 @@ namespace sshash { -template -void dictionary::build(std::string const& filename, - build_configuration const& build_config) // +namespace { + +inline void validate_and_normalize_build_config(build_configuration& bc, uint64_t max_k, + uint64_t max_m) // { - /* Validate the build configuration. */ - if (build_config.k == 0) throw std::runtime_error("k must be > 0"); - if (build_config.k > Kmer::max_k) { - throw std::runtime_error("k must be less <= " + std::to_string(Kmer::max_k) + - " but got k = " + std::to_string(build_config.k)); + if (bc.k == 0) throw std::runtime_error("k must be > 0"); + if (bc.k > max_k) { + throw std::runtime_error("k must be less <= " + std::to_string(max_k) + + " but got k = " + std::to_string(bc.k)); } - if (build_config.m == 0) throw std::runtime_error("m must be > 0"); - if (build_config.m > Kmer::max_m) { - throw std::runtime_error("m must be less <= " + std::to_string(Kmer::max_m) + - " but got m = " + std::to_string(build_config.m)); + + if (bc.m == 0) throw std::runtime_error("m must be > 0"); + if (bc.m > max_m) { + throw std::runtime_error("m must be less <= " + std::to_string(max_m) + + " but got m = " + std::to_string(bc.m)); } - if (build_config.m > build_config.k) throw std::runtime_error("m must be <= k"); + if (bc.m > bc.k) throw std::runtime_error("m must be <= k"); - dictionary_builder builder(build_config); - builder.build(*this, filename); + if (bc.ram_limit_in_GiB < constants::min_ram_limit_in_GiB) { + if (bc.verbose) { + std::cout << " --> NOTE: --ram-limit raised from " << bc.ram_limit_in_GiB + << " GiB to the floor of " << constants::min_ram_limit_in_GiB << " GiB" + << std::endl; + } + bc.ram_limit_in_GiB = constants::min_ram_limit_in_GiB; + } +} + +} // namespace + +template +void dictionary::build(std::string const& input_filename, + build_configuration const& build_config, + std::string const& output_filename) // +{ + build_configuration bc = build_config; + validate_and_normalize_build_config(bc, Kmer::max_k, Kmer::max_m); + dictionary_builder builder(bc); + builder.build(*this, input_filename, output_filename); } } // namespace sshash diff --git a/src/builder/build_sparse_and_skew_index.cpp b/src/builder/build_sparse_and_skew_index.cpp index 7ed9886..e7c1a17 100644 --- a/src/builder/build_sparse_and_skew_index.cpp +++ b/src/builder/build_sparse_and_skew_index.cpp @@ -1,7 +1,121 @@ +#include + #include "include/buckets_statistics.hpp" +#include "include/builder/streaming_compact_vector_writer.hpp" namespace sshash { +/* + A request to extract `num_kmers_in_super_kmer` consecutive k-mers from + `strings` starting at base position `starting_pos`. After requests are + externally sorted by `starting_pos`, k-mer extraction reduces to a single + forward scan over `strings`. +*/ +#pragma pack(push, 4) +struct kmer_extraction_request { + kmer_extraction_request() {} + kmer_extraction_request(uint64_t starting_pos, uint32_t partition_id, uint32_t pos_in_bucket, + uint32_t num_kmers_in_super_kmer) + : starting_pos(starting_pos) + , partition_id(partition_id) + , pos_in_bucket(pos_in_bucket) + , num_kmers_in_super_kmer(num_kmers_in_super_kmer) {} + + bool operator<(kmer_extraction_request const& o) const { return starting_pos < o.starting_pos; } + bool operator>(kmer_extraction_request const& o) const { return starting_pos > o.starting_pos; } + + static kmer_extraction_request max() { + return kmer_extraction_request(uint64_t(-1), uint32_t(-1), uint32_t(-1), uint32_t(-1)); + } + + uint64_t starting_pos; + uint32_t partition_id; + uint32_t pos_in_bucket; + uint32_t num_kmers_in_super_kmer; +}; +#pragma pack(pop) + +/* + A (mphf_pos, pos_in_bucket) record used to spill the per-skew-partition + `cvb_positions` to disk. We external-sort these by mphf_pos so the + streaming compact_vector writer can pack the final cvb_positions file in + a single forward pass. +*/ +#pragma pack(push, 4) +struct position_tuple { + position_tuple() {} + position_tuple(uint64_t mphf_pos, uint32_t pib) : mphf_pos(mphf_pos), pib(pib) {} + + bool operator<(position_tuple const& o) const { return mphf_pos < o.mphf_pos; } + bool operator>(position_tuple const& o) const { return mphf_pos > o.mphf_pos; } + static position_tuple max() { return {uint64_t(-1), uint32_t(-1)}; } + + uint64_t mphf_pos; + uint32_t pib; +}; +#pragma pack(pop) + +/* + Per-skew-partition tmp file record (written by step 7.2 phase (B), + consumed by phase (C)): a kmer's bit pattern + the pos_in_bucket + we'll later pack into the partition's positions compact_vector. +*/ +#pragma pack(push, 4) +template +struct skew_kmer_record_t { + using kmer_bits_t = decltype(Kmer{}.bits); + kmer_bits_t kmer_bits; + uint32_t pib; +}; +#pragma pack(pop) + +/* + Forward iterator over a per-skew-partition tmp file produced by phase + (B). Yields successive Kmer values via the minimal interface (`*it`, + `++it`) that pthash's external-memory partitioned PHF builder + consumes. + + pthash takes the iterator by value, so it must be copyable. The + underlying buffered_record_stream is held via shared_ptr and shared + between copies; pthash's copy advances the shared stream state, which + is fine because the original at the call site is unused after the + build returns. +*/ +template +struct skew_partition_kmer_iterator { + using iterator_category = std::forward_iterator_tag; + using value_type = Kmer; + using difference_type = std::ptrdiff_t; + using reference = Kmer const&; + using pointer = Kmer const*; + + skew_partition_kmer_iterator() = default; + + void open(std::string const& filename) { + m_stream = std::make_shared>>(); + m_stream->open(filename); + if (!m_stream->empty()) m_current.bits = m_stream->current().kmer_bits; + } + + void close() { + if (m_stream) m_stream->close(); + m_stream.reset(); + } + + Kmer const& operator*() const { return m_current; } + skew_partition_kmer_iterator& operator++() { + if (!m_stream->empty()) { + m_stream->advance(); + if (!m_stream->empty()) m_current.bits = m_stream->current().kmer_bits; + } + return *this; + } + +private: + std::shared_ptr>> m_stream; + Kmer m_current; +}; + template void dictionary_builder::build_sparse_and_skew_index( dictionary& d) // @@ -14,47 +128,60 @@ void dictionary_builder::build_sparse_and_skew_index( const uint64_t min_size = 1ULL << constants::min_l; const uint64_t num_bits_per_offset = strings_offsets_builder.num_bits_per_offset(); - mm::file_source input(minimizers.get_minimizers_filename(), - mm::advice::sequential); + const std::string minimizers_filename = minimizers.get_minimizers_filename(); buckets_statistics buckets_stats(num_minimizers, num_kmers, num_minimizer_positions); uint64_t num_buckets_larger_than_1_not_in_skew_index = 0; uint64_t num_buckets_in_skew_index = 0; - uint64_t num_super_kmers_in_buckets_larger_than_1 = 0; uint64_t num_minimizer_positions_of_buckets_larger_than_1 = 0; uint64_t num_minimizer_positions_of_buckets_in_skew_index = 0; - // First pass: collect bucket statistics to compute tighter bound - for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); // - it.has_next(); it.next()) // + /* + Pass 1: streaming statistics over the merged minimizers file. Buckets + are accumulated one at a time via std::ifstream-backed reads (no + mmap), so RAM usage is bounded by max_bucket_size * sizeof(tuple). + */ { - auto bucket = it.bucket(); - const uint64_t bucket_size = bucket.size(); - buckets_stats.add_bucket_size(bucket_size); - - if (bucket_size > 1) { - if (bucket_size <= min_size) { - ++num_buckets_larger_than_1_not_in_skew_index; - num_minimizer_positions_of_buckets_larger_than_1 += bucket_size; - } else { - ++num_buckets_in_skew_index; - num_minimizer_positions_of_buckets_in_skew_index += bucket_size; + streaming_minimizer_bucket_reader reader; + reader.open(minimizers_filename); + std::vector bucket_buf; + while (reader.has_next_bucket()) { + reader.next_bucket(bucket_buf); + uint64_t bucket_size = 0; + { + uint64_t prev = constants::invalid_uint64; + for (auto const& mt : bucket_buf) { + if (mt.pos_in_seq != prev) { + ++bucket_size; + prev = mt.pos_in_seq; + } + } + } + buckets_stats.add_bucket_size(bucket_size); + if (bucket_size > 1) { + if (bucket_size <= min_size) { + ++num_buckets_larger_than_1_not_in_skew_index; + num_minimizer_positions_of_buckets_larger_than_1 += bucket_size; + } else { + ++num_buckets_in_skew_index; + num_minimizer_positions_of_buckets_in_skew_index += bucket_size; + } + } + for (auto const& mt : bucket_buf) { + buckets_stats.add_num_kmers_in_super_kmer(bucket_size, mt.num_kmers_in_super_kmer); } - num_super_kmers_in_buckets_larger_than_1 += bucket.num_super_kmers(); - } - - for (auto mt : bucket) { - buckets_stats.add_num_kmers_in_super_kmer(bucket_size, mt.num_kmers_in_super_kmer); } + reader.close(); } assert(buckets_stats.num_buckets() == num_minimizers); - // Calculate bits needed for control codewords encoding: - // Encoding format: ((list_id << min_l) | (bucket_size - 2)) << 2 | status_code - // We need: 2 bits (status) + min_l bits (bucket_size) + bits for list_id - // list_id is bounded by the maximum number of buckets sharing the same size + /* + Calculate bits needed for control codewords encoding. + Encoding format: + ((list_id << min_l) | (bucket_size - 2)) << 2 | status_code + */ const uint64_t bits_for_list_id = std::ceil(std::log2(buckets_stats.max_sparse_buckets_per_size() + 1)); const uint64_t num_bits_for_control = @@ -67,18 +194,17 @@ void dictionary_builder::build_sparse_and_skew_index( std::cout << "num_bits_for_control = " << num_bits_for_control << std::endl; } - bits::compact_vector::builder control_codewords_builder; - control_codewords_builder.resize(num_minimizers, num_bits_for_control); - - strings_offsets_builder.build(d.m_spss.strings_offsets); - strings_builder.build(d.m_spss.strings); - - /* step 1. build sparse index */ - assert(buckets_stats.num_buckets() == num_minimizers); - const uint64_t max_bucket_size = buckets_stats.max_bucket_size(); const uint64_t log2_max_bucket_size = std::ceil(std::log2(max_bucket_size)); + uint64_t num_partitions = constants::max_l - constants::min_l + 1; + if (max_bucket_size < min_size) { + num_partitions = 0; + } else if (max_bucket_size < (1ULL << constants::max_l)) { + num_partitions = log2_max_bucket_size - constants::min_l; + } + assert(num_partitions <= 8); + if (build_config.verbose) { std::cout << "num_buckets_larger_than_1_not_in_skew_index " << num_buckets_larger_than_1_not_in_skew_index << "/" @@ -92,165 +218,267 @@ void dictionary_builder::build_sparse_and_skew_index( << std::endl; std::cout << "max_bucket_size " << max_bucket_size << std::endl; std::cout << "log2_max_bucket_size " << log2_max_bucket_size << std::endl; + std::cout << "num_partitions in skew index " << num_partitions << std::endl; } - std::vector buckets; - buckets.reserve(num_buckets_larger_than_1_not_in_skew_index + num_buckets_in_skew_index); - std::vector tuples; // backed memory - tuples.reserve(num_super_kmers_in_buckets_larger_than_1); + /* Materialize strings_offsets now (it's needed below to decode + pos_in_seq into absolute offsets when emitting heavy-bucket kmer + requests). `d.m_spss.strings` is materialized later (step 8) or + stream-saved directly. */ + strings_offsets_builder.build(d.m_spss.strings_offsets); - // Second pass: collect buckets > 1 for sorting AND handle size-1 buckets - for (minimizers_tuples_iterator it(input.data(), input.data() + input.size()); // - it.has_next(); it.next()) // - { - const uint64_t bucket_id = it.minimizer(); - auto bucket = it.bucket(); - const uint64_t bucket_size = bucket.size(); - - if (bucket_size == 1) { - // Handle size-1 buckets: encode directly into control codewords - uint64_t prev_pos_in_seq = constants::invalid_uint64; - for (auto mt : bucket) { - if (mt.pos_in_seq != prev_pos_in_seq) { - /* - For minimizers occurring once, store a (log(N)+1)-bit - code, as follows: |offset|0|, i.e., the LSB is 0. - */ - uint64_t code = mt.pos_in_seq << 1; // first LS bit encodes status code: 0 - assert(code < (uint64_t(1) << num_bits_for_control)); - control_codewords_builder.set(bucket_id, code); - prev_pos_in_seq = mt.pos_in_seq; - } - } - } else { - // Collect buckets > 1 for later processing - minimizer_tuple const* begin = tuples.data() + tuples.size(); - std::copy(bucket.begin_ptr(), bucket.end_ptr(), std::back_inserter(tuples)); - minimizer_tuple const* end = tuples.data() + tuples.size(); - buckets.push_back(bucket_type(begin, end)); + /* Precompute the layout of mid_load_buckets from the bucket-size + histogram. begin_buckets_of_size[s] is the start offset (in + positions, not bits) of size-s bucket positions in mid_load_buckets. */ + std::vector begin_buckets_of_size(min_size + 1, 0); + for (uint64_t s = 3; s <= min_size; ++s) { + begin_buckets_of_size[s] = static_cast( // + begin_buckets_of_size[s - 1] + buckets_stats.num_buckets_of_size(s - 1) * (s - 1)); + } + d.m_ssi.begin_buckets_of_size = std::move(begin_buckets_of_size); + + /* All step-7.1 outputs are spilled to disk; the in-RAM dictionary + fields stay empty (they're populated later either from disk for + --check or substituted by the streaming saver). */ + const uint64_t step7_run_id = pthash::clock_type::now().time_since_epoch().count(); + auto step7_path = [&](std::string const& tag) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << step7_run_id << "." << tag + << ".bin"; + return ss.str(); + }; + + spilled.control_codewords_path = step7_path("control_codewords"); + spilled.mid_load_buckets_path = step7_path("mid_load_buckets"); + spilled.heavy_load_buckets_path = step7_path("heavy_load_buckets"); + + /* Streaming writers for the two compact_vectors that get strictly + monotonic indices during the combined pass (control_codewords: + indexed by bucket_id == mphf hash, monotonic across buckets in + file order; heavy_load_buckets: indexed by a single monotone + cursor advanced inside the heavy branch). */ + streaming_compact_vector_writer control_codewords_writer; + control_codewords_writer.open(spilled.control_codewords_path, num_minimizers, + num_bits_for_control); + streaming_compact_vector_writer heavy_load_writer; + heavy_load_writer.open(spilled.heavy_load_buckets_path, + num_minimizer_positions_of_buckets_in_skew_index, num_bits_per_offset); + + /* mid_load: per-size tmp files of raw uint64_t positions. Each file is + written monotonically within its size class. After the combined + pass we stream them in size order through a streaming + compact_vector writer to assemble the final mid_load_buckets file. */ + auto mid_load_per_size_path = [&](uint64_t s) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << step7_run_id << ".mid_load_size_" + << s << ".bin"; + return ss.str(); + }; + std::vector mid_load_per_size(min_size + 1); + for (uint64_t s = 2; s <= min_size; ++s) { + if (buckets_stats.num_buckets_of_size(s) == 0) continue; + mid_load_per_size[s].open(mid_load_per_size_path(s), + std::ofstream::binary | std::ofstream::trunc); + if (!mid_load_per_size[s].is_open()) { + throw std::runtime_error("cannot open mid_load per-size tmp file"); } } - assert(buckets.size() == - num_buckets_larger_than_1_not_in_skew_index + num_buckets_in_skew_index); - input.close(); + std::vector list_id_per_size(min_size + 1, 0); + uint64_t heavy_load_cursor = 0; - std::sort(buckets.begin(), buckets.end(), - [](bucket_type const& x, bucket_type const& y) { return x.size() < y.size(); }); - - uint64_t num_partitions = constants::max_l - constants::min_l + 1; - if (max_bucket_size < min_size) { - num_partitions = 0; - } else if (max_bucket_size < (1ULL << constants::max_l)) { - num_partitions = log2_max_bucket_size - constants::min_l; - } - assert(num_partitions <= 8); // so that we need 3 bits to encode a partition_id - - if (build_config.verbose) { - std::cout << "num_partitions in skew index " << num_partitions << std::endl; - std::cout << "num_minimizer_positions_of_buckets_larger_than_1 " - << num_minimizer_positions_of_buckets_larger_than_1 << "/" - << num_minimizer_positions << " (" - << (num_minimizer_positions_of_buckets_larger_than_1 * 100.0) / - num_minimizer_positions - << "%)" << std::endl; - std::cout << "num_minimizer_positions_of_buckets_in_skew_index " - << num_minimizer_positions_of_buckets_in_skew_index << "/" - << num_minimizer_positions << " (" - << (num_minimizer_positions_of_buckets_in_skew_index * 100.0) / - num_minimizer_positions - << "%)" << std::endl; - } + /* Per-partition kmer counts; filled during the heavy branch of the + combined pass below. */ + std::vector num_kmers_in_partition(num_partitions, 0); + /* Skew-index tmp file naming. */ + const uint64_t skew_run_id = pthash::clock_type::now().time_since_epoch().count(); + auto request_run_filename = [&](uint64_t id) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << skew_run_id << ".kmer_requests." + << id << ".bin"; + return ss.str(); + }; + auto skew_partition_filename = [&](uint64_t pid) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << skew_run_id << ".skew_kmers." << pid + << ".bin"; + return ss.str(); + }; + + /* External-sort buffer for kmer-extraction requests (formerly step 7.2 + phase A; now folded into the combined pass). Capped at ram_limit/8 + so heap fragmentation across steps doesn't push peak RSS past the + --ram-limit budget. */ + std::atomic num_request_runs{0}; + const uint64_t request_buffer_capacity = + std::max(uint64_t(1) << 16, (build_config.ram_limit_in_GiB * essentials::GiB) / + (8 * sizeof(kmer_extraction_request))); + std::vector request_buffer; + request_buffer.reserve(request_buffer_capacity); + auto flush_request_buffer = [&]() { + if (request_buffer.empty()) return; + parallel_sort(request_buffer, build_config.num_threads, + [](kmer_extraction_request const& a, kmer_extraction_request const& b) { + return a.starting_pos < b.starting_pos; + }); + const uint64_t id = num_request_runs.fetch_add(1); + const std::string fn = request_run_filename(id); + std::ofstream out(fn, std::ofstream::binary); + if (!out.is_open()) throw std::runtime_error("cannot open file"); + out.write(reinterpret_cast(request_buffer.data()), + request_buffer.size() * sizeof(kmer_extraction_request)); + out.close(); + request_buffer.clear(); + }; + + /* Map bucket size → partition_id for heavy buckets. num_partitions <= 8 + so this loop is constant time. */ + auto partition_for_size = [&](uint64_t bucket_size) -> uint64_t { + assert(bucket_size > min_size); + uint64_t pid = 0; + uint64_t upper = 2 * min_size; + while (bucket_size > upper && pid + 1 < num_partitions) { + upper *= 2; + ++pid; + } + return pid; + }; + + /* + Combined pass: stream the merged minimizers file once and, per + bucket, write the appropriate part of the sparse index DIRECTLY TO + DISK via streaming compact_vector writers (control_codewords and + heavy_load_buckets) or per-size raw-value tmp files (mid_load). + For heavy buckets we also emit kmer-extraction requests in-line. + + Buckets are visited in mphf-hash (= bucket_id) order, so writes to + control_codewords are strictly monotonic. heavy_load_cursor is also + monotonic across the whole pass. mid_load per-size cursors are + each monotonic within their size class. + */ { - bits::compact_vector::builder mid_load_buckets_builder; - bits::compact_vector::builder heavy_load_buckets_builder; - mid_load_buckets_builder.resize(num_minimizer_positions_of_buckets_larger_than_1, - num_bits_per_offset); - heavy_load_buckets_builder.resize(num_minimizer_positions_of_buckets_in_skew_index, - num_bits_per_offset); - - std::vector begin_buckets_of_size; - begin_buckets_of_size.resize(min_size + 1, 0); - - uint64_t curr_bucket_size = 2; - uint64_t list_id = 0; - uint64_t mid_load_buckets_size = 0; - uint64_t heavy_load_buckets_size = 0; - - uint64_t partition_id = 0; - uint64_t lower = min_size; - uint64_t upper = 2 * lower; - - for (auto bucket : buckets) { - const uint64_t bucket_size = bucket.size(); - assert(bucket_size >= 2); - - if (bucket_size > curr_bucket_size) { - while (bucket_size > curr_bucket_size) ++curr_bucket_size; - if (curr_bucket_size <= min_size) { - begin_buckets_of_size[curr_bucket_size] = mid_load_buckets_size; - } else { - while (curr_bucket_size > upper) { - lower = upper; - upper = 2 * lower; - partition_id += 1; - if (partition_id == num_partitions - 1) upper = max_bucket_size; + streaming_minimizer_bucket_reader reader; + reader.open(minimizers_filename); + std::vector bucket_buf; + while (reader.has_next_bucket()) { + const uint64_t bucket_id = reader.next_bucket(bucket_buf); + uint64_t bucket_size = 0; + { + uint64_t prev = constants::invalid_uint64; + for (auto const& mt : bucket_buf) { + if (mt.pos_in_seq != prev) { + ++bucket_size; + prev = mt.pos_in_seq; } } - list_id = 0; } - if (curr_bucket_size <= min_size) { + if (bucket_size == 1) { + /* Singleton: code = |offset|0|, LSB = 0. */ + const uint64_t code = bucket_buf.front().pos_in_seq << 1; + assert(code < (uint64_t(1) << num_bits_for_control)); + control_codewords_writer.set(bucket_id, code); + } else if (bucket_size <= min_size) { + /* Mid-load: write positions to per-size raw file at the + per-size cursor; assign the next list_id for this size. */ + const uint64_t list_id = list_id_per_size[bucket_size]++; + const uint64_t code = + (((list_id << constants::min_l) | (bucket_size - 2)) << 2) | 1; + assert(code < (uint64_t(1) << num_bits_for_control)); + control_codewords_writer.set(bucket_id, code); + + auto& out = mid_load_per_size[bucket_size]; uint64_t prev_pos_in_seq = constants::invalid_uint64; - for (auto mt : bucket) { - if (prev_pos_in_seq == constants::invalid_uint64) { // only once - uint64_t p = (list_id << constants::min_l) | (curr_bucket_size - 2); - uint64_t code = (p << 2) | 1; // first two LS bits encode status code: 01 - assert(code < (uint64_t(1) << num_bits_for_control)); - control_codewords_builder.set(mt.minimizer, code); - } + for (auto const& mt : bucket_buf) { if (mt.pos_in_seq != prev_pos_in_seq) { - mid_load_buckets_builder.push_back(mt.pos_in_seq); + const uint64_t v = mt.pos_in_seq; + out.write(reinterpret_cast(&v), sizeof(uint64_t)); prev_pos_in_seq = mt.pos_in_seq; - mid_load_buckets_size += 1; } } - ++list_id; } else { + /* Heavy: write positions at the monotone heavy_load_cursor, + set the codeword (encodes the start offset and partition + id), and emit kmer-extraction requests for each + super-kmer in the bucket. */ + const uint64_t partition_id = partition_for_size(bucket_size); + assert(partition_id < num_partitions); + const uint64_t bucket_begin = heavy_load_cursor; + const uint64_t code = (((bucket_begin << 3) | partition_id) << 2) | 3; + assert(code < (uint64_t(1) << num_bits_for_control)); + control_codewords_writer.set(bucket_id, code); + + uint32_t pos_in_bucket = uint32_t(-1); uint64_t prev_pos_in_seq = constants::invalid_uint64; - for (auto mt : bucket) { - if (prev_pos_in_seq == constants::invalid_uint64) { // only once - assert(partition_id < 8); - uint64_t p = (heavy_load_buckets_size << 3) | partition_id; - uint64_t code = (p << 2) | 3; // first two LS bits encode status code: 11 - assert(code < (uint64_t(1) << num_bits_for_control)); - control_codewords_builder.set(mt.minimizer, code); - } + for (auto const& mt : bucket_buf) { + num_kmers_in_partition[partition_id] += mt.num_kmers_in_super_kmer; if (mt.pos_in_seq != prev_pos_in_seq) { - heavy_load_buckets_builder.push_back(mt.pos_in_seq); + heavy_load_writer.set(heavy_load_cursor++, mt.pos_in_seq); prev_pos_in_seq = mt.pos_in_seq; - heavy_load_buckets_size += 1; + ++pos_in_bucket; } + assert(mt.pos_in_seq >= mt.pos_in_kmer); + const uint64_t abs_offset = + d.m_spss.strings_offsets.decode(mt.pos_in_seq).absolute_offset; + const uint64_t starting_pos = abs_offset - mt.pos_in_kmer; + if (request_buffer.size() == request_buffer_capacity) flush_request_buffer(); + request_buffer.emplace_back(starting_pos, uint32_t(partition_id), pos_in_bucket, + uint32_t(mt.num_kmers_in_super_kmer)); } } } + reader.close(); + flush_request_buffer(); + } - d.m_ssi.begin_buckets_of_size = std::move(begin_buckets_of_size); + /* Finalize the directly-streamed compact_vector files. */ + control_codewords_writer.finalize(); + heavy_load_writer.finalize(); - control_codewords_builder.build(d.m_ssi.codewords.control_codewords); - mid_load_buckets_builder.build(d.m_ssi.mid_load_buckets); - heavy_load_buckets_builder.build(d.m_ssi.ski.heavy_load_buckets); + /* Close per-size mid_load files. */ + for (uint64_t s = 2; s <= min_size; ++s) { + if (mid_load_per_size[s].is_open()) mid_load_per_size[s].close(); } - timer.stop(); - - build_stats.add("step 7.1 (build sparse index)", uint64_t(timer.elapsed())); + /* Concatenate per-size mid_load files in size order into the final + mid_load_buckets compact_vector file via the streaming writer. + Each per-size file holds raw uint64_t values written monotonically + within its size class; we just stream them through, packing into + num_bits_per_offset-bit fields at the precomputed begin offset for + each size. */ + { + streaming_compact_vector_writer mid_load_writer; + mid_load_writer.open(spilled.mid_load_buckets_path, + num_minimizer_positions_of_buckets_larger_than_1, num_bits_per_offset); + uint64_t global_index = 0; + for (uint64_t s = 2; s <= min_size; ++s) { + const uint64_t expected = buckets_stats.num_buckets_of_size(s) * s; + if (expected == 0) continue; + std::ifstream in(mid_load_per_size_path(s), std::ifstream::binary); + if (!in.is_open()) { + throw std::runtime_error("cannot reopen mid_load per-size tmp file"); + } + for (uint64_t i = 0; i != expected; ++i) { + uint64_t v; + in.read(reinterpret_cast(&v), sizeof(uint64_t)); + if (in.gcount() != static_cast(sizeof(uint64_t))) { + throw std::runtime_error("mid_load per-size tmp file truncated"); + } + mid_load_writer.set(global_index++, v); + } + in.close(); + std::remove(mid_load_per_size_path(s).c_str()); + } + mid_load_writer.finalize(); + } + timer.stop(); + build_stats.add("step 7.1 (build sparse index)", + musec_as_seconds_str(uint64_t(timer.elapsed())).c_str()); if (build_config.verbose) { print_time(uint64_t(timer.elapsed()), "step 7.1 (build sparse index)"); } - timer.reset(); if (num_buckets_in_skew_index == 0) { @@ -258,228 +486,314 @@ void dictionary_builder::build_sparse_and_skew_index( return; } - /* step 2. build skew index */ - timer.start(); - std::vector num_kmers_in_partition(num_partitions, 0); - - { - uint64_t partition_id = 0; - uint64_t lower = min_size; - uint64_t upper = 2 * lower; - uint64_t num_kmers_in_skew_index = 0; - - for (uint64_t i = buckets.size() - num_buckets_in_skew_index; i <= buckets.size(); ++i) // - { - auto const& bucket = buckets[i]; - while (i == buckets.size() or bucket.size() > upper) // - { - if (build_config.verbose) { - std::cout << " partition = " << partition_id - << ": num kmers in buckets of size > " << lower << " and <= " << upper - << ": " << num_kmers_in_partition[partition_id] << std::endl; - } + /* + step 2. build skew index - num_kmers_in_skew_index += num_kmers_in_partition[partition_id]; + Phases (B) and (C) below; phase (A) was folded into the combined + sparse pass above. Phase (B) extracts k-mers from `strings` in a + single forward sweep guided by the externally-sorted requests, and + phase (C) builds the per-partition MPHF + cvb_positions on disk + from the per-partition kmer files. + */ + timer.start(); - if (i == buckets.size()) break; + if (build_config.verbose) { + uint64_t total_kmers_in_skew = 0; + for (uint64_t p = 0; p != num_partitions; ++p) { + total_kmers_in_skew += num_kmers_in_partition[p]; + std::cout << " partition = " << p + << ": num kmers in partition = " << num_kmers_in_partition[p] << std::endl; + } + std::cout << "num kmers in skew index = " << total_kmers_in_skew << " (" + << (total_kmers_in_skew * 100.0) / buckets_stats.num_kmers() << "%)" << std::endl; + } - lower = upper; - upper = 2 * lower; - partition_id += 1; - if (partition_id == num_partitions - 1) upper = max_bucket_size; + /* (B) sequential extraction over `strings` -> per-partition kmer tmp files */ + { + struct request_run_names_iterator { + request_run_names_iterator(std::string const& tmp_dirname, uint64_t skew_run_id) + : i(0), skew_run_id(skew_run_id), tmp_dirname(tmp_dirname) {} + + std::string operator*() const { + std::stringstream ss; + ss << tmp_dirname << "/sshash.tmp.run_" << skew_run_id << ".kmer_requests." << i + << ".bin"; + return ss.str(); } + void operator++() { ++i; } + + uint64_t i; + uint64_t skew_run_id; + std::string tmp_dirname; + }; + + request_run_names_iterator names_it(build_config.tmp_dirname, skew_run_id); + file_merging_iterator merger(names_it, num_request_runs.load()); + + std::vector partition_writers(num_partitions); + for (uint64_t p = 0; p != num_partitions; ++p) { + if (num_kmers_in_partition[p] == 0) continue; + partition_writers[p].open(skew_partition_filename(p), + std::ofstream::binary | std::ofstream::trunc); + if (!partition_writers[p].is_open()) { + throw std::runtime_error("cannot open skew-partition tmp file"); + } + } - if (i == buckets.size()) break; + const uint64_t k = build_config.k; + const bool canonical = build_config.canonical; + auto strings_reader = strings_builder.make_reader(); + kmer_iterator kmer_it(strings_reader, k); - assert(bucket.size() > lower and bucket.size() <= upper); - for (auto mt : bucket) { - num_kmers_in_partition[partition_id] += mt.num_kmers_in_super_kmer; + while (merger.has_next()) // + { + const kmer_extraction_request req = *merger; + kmer_it.at(Kmer::bits_per_char * req.starting_pos); + for (uint32_t i = 0; i != req.num_kmers_in_super_kmer; ++i) { + Kmer kmer = kmer_it.get(); + if (canonical) { + Kmer kmer_rc = kmer; + kmer_rc.reverse_complement_inplace(k); + kmer = std::min(kmer, kmer_rc); + } + auto& w = partition_writers[req.partition_id]; + skew_kmer_record_t rec{kmer.bits, req.pos_in_bucket}; + w.write(reinterpret_cast(&rec), sizeof(rec)); + kmer_it.next(); } + merger.next(); } - assert(partition_id == num_partitions - 1); + merger.close(); - if (build_config.verbose) { - std::cout << "num kmers in skew index = " << num_kmers_in_skew_index << " (" - << (num_kmers_in_skew_index * 100.0) / buckets_stats.num_kmers() << "%)" - << std::endl; + for (auto& w : partition_writers) { + if (w.is_open()) w.close(); } - assert(num_kmers_in_skew_index == std::accumulate(num_kmers_in_partition.begin(), - num_kmers_in_partition.end(), - uint64_t(0))); + for (uint64_t i = 0; i != num_request_runs.load(); ++i) { + std::remove(request_run_filename(i).c_str()); + } } + /* + (C) per-partition MPHF + cvb_positions build, both on disk. + + Per partition: + (1) Build MPHF in external memory by streaming the partition's + kmer file (pthash spills hashes to tmp_dirname under its own + ram budget). + (2) Stream-read the kmer file, compute F(kmer), emit + (F(kmer), pos_in_bucket) tuples to disk; external-sort by + F(kmer); stream sorted tuples through a + streaming_compact_vector_writer to produce the partition's + cvb_positions tmp file. + + Only the MPHF itself is held in RAM (pthash returns it as an + in-memory struct); cvb_positions is fully spilled. + */ { + spilled.skew_positions_paths.assign(num_partitions, std::string()); + spilled.skew_mphfs_paths.assign(num_partitions, std::string()); std::vector> mphfs; - std::vector positions; mphfs.resize(num_partitions); - positions.resize(num_partitions); pthash::build_configuration mphf_build_config; - mphf_build_config.lambda = - build_config.lambda + 2.0; /* Use higher lambda here since we have less keys. */ + mphf_build_config.lambda = build_config.lambda + 2.0; mphf_build_config.alpha = 0.94; mphf_build_config.seed = util::get_seed_for_hash_function(build_config); mphf_build_config.verbose = false; - mphf_build_config.num_threads = build_config.num_threads; - mphf_build_config.avg_partition_size = constants::avg_partition_size; + util::configure_mphf_threads_and_partition(mphf_build_config, build_config.num_threads, + build_config.ram_limit_in_GiB, + build_config.verbose, "skew partition MPHF"); + mphf_build_config.ram = (build_config.ram_limit_in_GiB * essentials::GiB) / 2; + mphf_build_config.tmp_dir = build_config.tmp_dirname; + + const uint64_t pos_run_basename_id = pthash::clock_type::now().time_since_epoch().count(); + auto pos_run_filename = [&](uint64_t partition_id, uint64_t id) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << pos_run_basename_id + << ".pos_runs.p" << partition_id << "." << id << ".bin"; + return ss.str(); + }; + auto skew_positions_filename = [&](uint64_t partition_id) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << pos_run_basename_id + << ".skew_positions.p" << partition_id << ".bin"; + return ss.str(); + }; + auto skew_mphf_filename = [&](uint64_t partition_id) { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" << pos_run_basename_id + << ".skew_mphf.p" << partition_id << ".bin"; + return ss.str(); + }; + + /* Capped at ram_limit/8: this buffer is alive during phase C + alongside pthash's parallel-build memory and the currently- + building partition's MPHF, so it has to share the RAM budget. */ + const uint64_t pos_buffer_capacity = std::max( + uint64_t(1) << 16, + (build_config.ram_limit_in_GiB * essentials::GiB) / (8 * sizeof(position_tuple))); - uint64_t partition_id = 0; uint64_t lower = min_size; uint64_t upper = 2 * lower; uint64_t num_bits_per_pos = constants::min_l + 1; + if (num_partitions == 1) { + upper = max_bucket_size; + num_bits_per_pos = log2_max_bucket_size; + } - /* Temporary storage for kmers and positions within a partition. */ - std::vector kmers; - std::vector positions_in_bucket; - bits::compact_vector::builder cvb_positions; - kmers.reserve(num_kmers_in_partition[partition_id]); - positions_in_bucket.reserve(num_kmers_in_partition[partition_id]); - cvb_positions.resize(num_kmers_in_partition[partition_id], num_bits_per_pos); - - for (uint64_t i = buckets.size() - num_buckets_in_skew_index, k = build_config.k; - i <= buckets.size(); ++i) // + for (uint64_t partition_id = 0; partition_id != num_partitions; ++partition_id) // { - auto const& bucket = buckets[i]; - while (i == buckets.size() or bucket.size() > upper) // + const uint64_t n = num_kmers_in_partition[partition_id]; + + if (build_config.verbose) { + std::cout << " lower = " << lower << "; upper = " << upper + << "; num_bits_per_pos = " << num_bits_per_pos + << "; num_kmers_in_partition = " << n << std::endl; + } + + if (n > 0) // { + const std::string kmer_fn = skew_partition_filename(partition_id); + if (build_config.verbose) { - std::cout << " lower = " << lower << "; upper = " << upper - << "; num_bits_per_pos = " << num_bits_per_pos - << "; num_kmers_in_partition = " << kmers.size() << std::endl; + const uint64_t avg_partition_size = + pthash::compute_avg_partition_size(n, mphf_build_config); + const uint64_t pthash_num_partitions = + pthash::compute_num_partitions(n, avg_partition_size); + assert(pthash_num_partitions > 0); + std::cout << " building MPHF (external memory) with " + << mphf_build_config.num_threads << " threads and " + << pthash_num_partitions + << " partitions (avg. partition size = " << avg_partition_size + << ")..." << std::endl; } - assert(num_kmers_in_partition[partition_id] == kmers.size()); - assert(num_kmers_in_partition[partition_id] == positions_in_bucket.size()); - if (num_kmers_in_partition[partition_id] > 0) // + /* (1) Build the MPHF by streaming kmers from the partition file. */ + auto& F = mphfs[partition_id]; { - /*******/ - // { - // uint64_t RAM_available_in_bytes = essentials::GiB; - - // uint64_t RAM_taken_in_bytes = essentials::vec_bytes(buckets) + - // essentials::vec_bytes(tuples) + - - // essentials::vec_bytes(kmers) + - // essentials::vec_bytes(positions_in_bucket) - // + - // essentials::vec_bytes(cvb_positions.data()) - // + - - // d.num_bits() / 8; // current memory - - // std::cout << "RAM_taken_in_bytes = " << RAM_taken_in_bytes << std::endl; - - // const uint64_t RAM_limit_in_bytes = - // build_config.ram_limit_in_GiB * essentials::GiB; - - // if (RAM_limit_in_bytes > RAM_taken_in_bytes) { - // RAM_available_in_bytes = std::max( - // RAM_limit_in_bytes - RAM_taken_in_bytes, RAM_available_in_bytes); - // } - // std::cout << "RAM_available_in_bytes = " << RAM_available_in_bytes - // << std::endl; - - // mphf_build_config.ram = RAM_available_in_bytes / 2; // at least 0.5 GB - // } - /*******/ - - if (build_config.verbose) { - const uint64_t avg_partition_size = - pthash::compute_avg_partition_size(kmers.size(), mphf_build_config); - const uint64_t num_partitions = - pthash::compute_num_partitions(kmers.size(), avg_partition_size); - assert(num_partitions > 0); - std::cout << " building MPHF with " << mphf_build_config.num_threads - << " threads and " << num_partitions - << " partitions (avg. partition size = " << avg_partition_size - << ")..." << std::endl; - } - - auto& F = mphfs[partition_id]; - F.build_in_internal_memory(kmers.begin(), kmers.size(), mphf_build_config); + skew_partition_kmer_iterator iter; + iter.open(kmer_fn); + F.build_in_external_memory(iter, n, mphf_build_config); + iter.close(); + } - if (build_config.verbose) { - std::cout << " built mphs[" << partition_id << "] for " << kmers.size() - << " kmers; bits/key = " - << static_cast(F.num_bits()) / F.num_keys() << std::endl; - } + if (build_config.verbose) { + std::cout << " built mphs[" << partition_id << "] for " << F.num_keys() + << " kmers; bits/key = " + << static_cast(F.num_bits()) / F.num_keys() << std::endl; + } - for (uint64_t i = 0; i != kmers.size(); ++i) { - Kmer kmer = kmers[i]; - uint64_t pos = F(kmer); - uint32_t pos_in_bucket = positions_in_bucket[i]; - cvb_positions.set(pos, pos_in_bucket); + /* (2a) Stream-read kmer file, compute F(kmer), externally + sort (F(kmer), pos_in_bucket) tuples by F(kmer). */ + std::atomic pos_num_runs{0}; + { + std::vector pos_buffer; + pos_buffer.reserve(pos_buffer_capacity); + auto flush_pos_buffer = [&]() { + if (pos_buffer.empty()) return; + parallel_sort(pos_buffer, build_config.num_threads, + [](position_tuple const& a, position_tuple const& b) { + return a.mphf_pos < b.mphf_pos; + }); + const uint64_t id = pos_num_runs.fetch_add(1); + std::ofstream out(pos_run_filename(partition_id, id), + std::ofstream::binary); + if (!out.is_open()) { + throw std::runtime_error("cannot open positions tuple run file"); + } + out.write(reinterpret_cast(pos_buffer.data()), + pos_buffer.size() * sizeof(position_tuple)); + out.close(); + pos_buffer.clear(); + }; + + buffered_record_stream> rec_stream; + rec_stream.open(kmer_fn); + for (uint64_t i = 0; i != n; ++i) { + assert(!rec_stream.empty()); + auto const& rec = rec_stream.current(); + Kmer kmer; + kmer.bits = rec.kmer_bits; + const uint64_t pos = F(kmer); + if (pos_buffer.size() == pos_buffer_capacity) flush_pos_buffer(); + pos_buffer.emplace_back(pos, rec.pib); + rec_stream.advance(); } - auto& P = positions[partition_id]; - cvb_positions.build(P); + rec_stream.close(); + std::remove(kmer_fn.c_str()); + flush_pos_buffer(); + } - if (build_config.verbose) { - std::cout << " built positions[" << partition_id << "] for " << P.size() - << " kmers; bits/key = " << (P.num_bytes() * 8.0) / P.size() - << std::endl; + /* (2b) Stream sorted tuples through the streaming + compact_vector writer to produce the partition's + cvb_positions tmp file. */ + { + spilled.skew_positions_paths[partition_id] = + skew_positions_filename(partition_id); + streaming_compact_vector_writer pos_writer; + pos_writer.open(spilled.skew_positions_paths[partition_id], n, + num_bits_per_pos); + + struct pos_run_names_iterator { + pos_run_names_iterator(uint64_t partition_id, + std::function fn) + : i(0), partition_id(partition_id), fn(std::move(fn)) {} + std::string operator*() { return fn(partition_id, i); } + void operator++() { ++i; } + uint64_t i; + uint64_t partition_id; + std::function fn; + }; + pos_run_names_iterator names_it(partition_id, pos_run_filename); + file_merging_iterator merger(names_it, pos_num_runs.load()); + while (merger.has_next()) { + position_tuple pt = *merger; + pos_writer.set(pt.mphf_pos, pt.pib); + merger.next(); } + merger.close(); + pos_writer.finalize(); } - if (i == buckets.size()) break; + /* Cleanup the position-tuple run files. */ + for (uint64_t i = 0; i != pos_num_runs.load(); ++i) { + std::remove(pos_run_filename(partition_id, i).c_str()); + } - lower = upper; - upper = 2 * lower; - num_bits_per_pos += 1; - partition_id += 1; - if (partition_id == num_partitions - 1) { - upper = max_bucket_size; - num_bits_per_pos = log2_max_bucket_size; + if (build_config.verbose) { + std::cout << " built positions[" << partition_id << "] for " << n + << " kmers; bits/key = " << num_bits_per_pos << std::endl; } - kmers.clear(); - positions_in_bucket.clear(); - kmers.reserve(num_kmers_in_partition[partition_id]); - positions_in_bucket.reserve(num_kmers_in_partition[partition_id]); - cvb_positions.resize(num_kmers_in_partition[partition_id], num_bits_per_pos); + /* Spill the partition's MPHF to disk (no longer needed + during build) and free its in-RAM footprint. The + accumulating skew MPHFs were the dominant resident + memory at the end of phase C. */ + spilled.skew_mphfs_paths[partition_id] = skew_mphf_filename(partition_id); + essentials::save(F, spilled.skew_mphfs_paths[partition_id].c_str()); + F = kmers_pthash_type{}; } - if (i == buckets.size()) break; - - assert(bucket.size() > lower and bucket.size() <= upper); - uint64_t pos_in_bucket = -1; - uint64_t prev_pos_in_seq = constants::invalid_uint64; - for (auto mt : bucket) // - { - if (mt.pos_in_seq != prev_pos_in_seq) { - prev_pos_in_seq = mt.pos_in_seq; - ++pos_in_bucket; - } - assert(mt.pos_in_seq >= mt.pos_in_kmer); - - mt.pos_in_seq = d.m_spss.strings_offsets.decode(mt.pos_in_seq).absolute_offset; - - const uint64_t starting_pos_of_super_kmer = mt.pos_in_seq - mt.pos_in_kmer; - kmer_iterator it( - d.m_spss.strings, k, Kmer::bits_per_char * starting_pos_of_super_kmer); - for (uint64_t i = 0; i != mt.num_kmers_in_super_kmer; ++i) { - auto kmer = it.get(); - if (build_config.canonical) { /* take the canonical kmer */ - auto kmer_rc = kmer; - kmer_rc.reverse_complement_inplace(k); - kmer = std::min(kmer, kmer_rc); - } - kmers.push_back(kmer); - positions_in_bucket.push_back(pos_in_bucket); - it.next(); - } - assert(pos_in_bucket < (1ULL << cvb_positions.width())); + /* advance partition state for the next iteration */ + lower = upper; + upper = 2 * lower; + num_bits_per_pos += 1; + if (partition_id + 1 == num_partitions - 1) { + upper = max_bucket_size; + num_bits_per_pos = log2_max_bucket_size; } } - assert(partition_id == num_partitions - 1); d.m_ssi.ski.mphfs = std::move(mphfs); - d.m_ssi.ski.positions = std::move(positions); + /* d.m_ssi.ski.positions stays empty here; it will be populated + either by step 8 (materialize) or substituted at stream-save. */ } timer.stop(); - build_stats.add("step 7.2 (build skew index)", uint64_t(timer.elapsed())); + build_stats.add("step 7.2 (build skew index)", + musec_as_seconds_str(uint64_t(timer.elapsed())).c_str()); if (build_config.verbose) { print_time(uint64_t(timer.elapsed()), "step 7.2 (build skew index)"); diff --git a/src/builder/compute_minimizer_tuples.cpp b/src/builder/compute_minimizer_tuples.cpp index 94916d6..a3e98ec 100644 --- a/src/builder/compute_minimizer_tuples.cpp +++ b/src/builder/compute_minimizer_tuples.cpp @@ -47,15 +47,21 @@ void dictionary_builder::compute_minimizer_tuples() // const uint64_t index_end = std::min(index_begin + num_sequences_per_thread, num_sequences); - kmer_iterator kmer_it(strings_builder, k); + auto strings_reader = strings_builder.make_reader(); + kmer_iterator kmer_it(strings_reader, k); + /* Per-thread forward reader over the offsets file, positioned + so the first `next()` returns offsets[index_begin]. */ + auto offsets_reader = strings_offsets_builder.make_reader(index_begin); + uint64_t prev_offset = offsets_reader.next(); // == offsets[index_begin] hasher_type hasher(build_config.seed); minimizer_iterator minimizer_it(k, m, hasher); minimizer_iterator_rc minimizer_it_rc(k, m, hasher); for (uint64_t i = index_begin; i < index_end; ++i) // { - const uint64_t begin = strings_offsets_builder[i]; - const uint64_t end = strings_offsets_builder[i + 1]; + const uint64_t begin = prev_offset; + const uint64_t end = offsets_reader.next(); // offsets[i + 1] + prev_offset = end; const uint64_t sequence_len = end - begin; assert(sequence_len >= k); diff --git a/tools/build.cpp b/tools/build.cpp index 6630386..e6e3f67 100644 --- a/tools/build.cpp +++ b/tools/build.cpp @@ -73,12 +73,33 @@ int build(int argc, char** argv) { // build_config.print(); - essentials::logger("building data structure..."); - dictionary_type dict; - dict.build(input_filename, build_config); - bool check = parser.get("check"); + bool has_output = parser.parsed("output_filename"); + + /* Always build via the streaming-save path: peak RAM is bounded by + the build phase only. If the caller didn't pass -o, write to a + tmp file in `tmp_dirname` and delete it after the build (or after + the --check verification). */ + std::string output_filename; + if (has_output) { + output_filename = parser.get("output_filename"); + } else { + std::stringstream ss; + ss << build_config.tmp_dirname << "/sshash.tmp.run_" + << pthash::clock_type::now().time_since_epoch().count() << ".index.bin"; + output_filename = ss.str(); + } + + { + dictionary_type dict; + essentials::logger("building data structure..."); + dict.build(input_filename, build_config, output_filename); + essentials::logger("DONE"); + } + if (check) { + dictionary_type dict; + open_dictionary(dict, output_filename, /*mmap=*/true, build_config.verbose); check_correctness_lookup_access(dict, input_filename); check_correctness_navigational_kmer_query(dict, input_filename); check_correctness_navigational_string_query(dict); @@ -87,12 +108,7 @@ int build(int argc, char** argv) { check_correctness_string_iterator(dict); } - if (parser.parsed("output_filename")) { - auto output_filename = parser.get("output_filename"); - essentials::logger("saving data structure to disk..."); - essentials::save(dict, output_filename.c_str()); - essentials::logger("DONE"); - } + if (!has_output) std::remove(output_filename.c_str()); return 0; }