Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1757,12 +1757,19 @@ DEFINE_String(ann_index_ivf_list_cache_limit, "70%");
// Stale sweep time for ANN index IVF list cache in seconds. 3600s is 1 hour.
DEFINE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec, "3600");

// Chunk size for ANN/vector index building per training/adding batch
// Row count for ANN/vector index build add batch and memory buffer before spill.
// 1M By default.
DEFINE_mInt64(ann_index_build_chunk_size, "1000000");
DEFINE_Validator(ann_index_build_chunk_size,
[](const int64_t config) -> bool { return config > 0; });

// Maximum row count for ANN/vector index training sample.
// The effective sample keeps at least the index-required minimum training rows.
// 1M By default.
DEFINE_mInt64(ann_index_build_max_train_rows, "1000000");
DEFINE_Validator(ann_index_build_max_train_rows,
[](const int64_t config) -> bool { return config > 0; });

DEFINE_mBool(enable_wal_tde, "false");

DEFINE_mBool(print_stack_when_cache_miss, "false");
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1809,8 +1809,10 @@ DECLARE_mInt32(max_segment_partial_column_cache_size);
DECLARE_String(ann_index_ivf_list_cache_limit);
// Stale sweep time for ANN index IVF list cache in seconds.
DECLARE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec);
// Chunk size for ANN/vector index building per training/adding batch
// Row count for ANN/vector index build add batch and memory buffer before spill.
DECLARE_mInt64(ann_index_build_chunk_size);
// Maximum row count for ANN/vector index training sample.
DECLARE_mInt64(ann_index_build_max_train_rows);

DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction);
DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction);
Expand Down
291 changes: 223 additions & 68 deletions be/src/storage/index/ann/ann_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

#include "storage/index/ann/ann_index_writer.h"

#include <fmt/format.h>

#include <algorithm>
#include <cstddef>
#include <memory>
#include <string>

#include "common/cast_set.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "storage/index/ann/faiss_ann_index.h"
#include "storage/index/inverted/inverted_index_fs_directory.h"
#include "util/slice.h"
#include "util/uid_util.h"

namespace doris::segment_v2 {
static std::string get_or_default(const std::map<std::string, std::string>& properties,
Expand All @@ -39,7 +48,9 @@ AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer,
const TabletIndex* index_meta)
: _index_file_writer(index_file_writer), _index_meta(index_meta) {}

AnnIndexColumnWriter::~AnnIndexColumnWriter() {}
AnnIndexColumnWriter::~AnnIndexColumnWriter() {
_delete_spool_file();
}

Status AnnIndexColumnWriter::init() {
Result<std::shared_ptr<DorisFSDirectory>> compound_dir = _index_file_writer->open(_index_meta);
Expand Down Expand Up @@ -77,8 +88,15 @@ Status AnnIndexColumnWriter::init() {
index_type, build_parameter.dim, metric_type, build_parameter.max_degree,
build_parameter.ef_construction, quantizer);

size_t block_size = AnnIndexColumnWriter::chunk_size() * build_parameter.dim;
_float_array.reserve(block_size);
const size_t chunk_elements = AnnIndexColumnWriter::chunk_size() * build_parameter.dim;
const Int64 min_train_rows = _vector_index->get_min_train_rows();
if (min_train_rows > 0) {
_training_sample.reserve(_training_sample_rows_limit(min_train_rows) *
build_parameter.dim);
}
_buffered_vectors.reserve(chunk_elements);
_training_sample_seen_rows = 0;
_training_sample_rng.seed(0);

return Status::OK();
}
Expand All @@ -87,7 +105,11 @@ Status AnnIndexColumnWriter::add_values(const std::string fn, const void* values
return Status::OK();
}

void AnnIndexColumnWriter::close_on_error() {}
void AnnIndexColumnWriter::close_on_error() {
_delete_spool_file();
_training_sample.clear();
_buffered_vectors.clear();
}

Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* value_ptr,
const uint8_t* null_map, const uint8_t* offsets_ptr,
Expand All @@ -109,26 +131,13 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val

const float* p = reinterpret_cast<const float*>(value_ptr);

const size_t full_elements = AnnIndexColumnWriter::chunk_size() * dim;
size_t remaining_elements = num_rows * dim;
size_t src_offset = 0;
while (remaining_elements > 0) {
size_t available_space = full_elements - _float_array.size();
size_t elements_to_add = std::min(remaining_elements, available_space);

_float_array.insert(_float_array.end(), p + src_offset, p + src_offset + elements_to_add);
src_offset += elements_to_add;
remaining_elements -= elements_to_add;

if (_float_array.size() == full_elements) {
RETURN_IF_ERROR(
_vector_index->train(AnnIndexColumnWriter::chunk_size(), _float_array.data()));
RETURN_IF_ERROR(
_vector_index->add(AnnIndexColumnWriter::chunk_size(), _float_array.data()));
_float_array.clear();
_need_save_index = true;
}
const Int64 min_train_rows = _vector_index->get_min_train_rows();
if (min_train_rows == 0) {
RETURN_IF_ERROR(_append_vectors_no_train(p, num_rows));
} else {
RETURN_IF_ERROR(_append_vectors_need_train(p, num_rows, min_train_rows));
}
_total_rows += cast_set<int64_t>(num_rows);

return Status::OK();
}
Expand All @@ -146,54 +155,200 @@ int64_t AnnIndexColumnWriter::size() const {
}

Status AnnIndexColumnWriter::finish() {
Int64 min_train_rows = _vector_index->get_min_train_rows();

// Check if we have enough rows to train the index
// train/add the remaining data
if (_float_array.empty()) {
if (_need_save_index) {
return _vector_index->save(_dir.get());
} else {
// No data was added at all. This can happen if the segment has 0 rows
// or all rows were filtered out. We need to delete the directory entry
// to avoid writing an empty/invalid index file.
LOG_INFO("No data to train/add for ANN index. Skipping index building.");
return _index_file_writer->delete_index(_index_meta);
if (_total_rows == 0) {
LOG_INFO("No data to train/add for ANN index. Skipping index building.");
Status st = _index_file_writer->delete_index(_index_meta);
_delete_spool_file();
return st;
}

const Int64 min_train_rows = _vector_index->get_min_train_rows();
Status st = min_train_rows == 0 ? _vector_index->save(_dir.get())
: _train_and_add(min_train_rows);
_delete_spool_file();
return st;
}

Status AnnIndexColumnWriter::_append_vectors_no_train(const float* vectors, size_t num_rows) {
DCHECK(vectors != nullptr);
DCHECK(num_rows > 0);
return _vector_index->add(cast_set<Int64>(num_rows), vectors);
}

Status AnnIndexColumnWriter::_append_vectors_need_train(const float* vectors, size_t num_rows,
Int64 min_train_rows) {
DCHECK(vectors != nullptr);
DCHECK(num_rows > 0);

const size_t dim = _vector_index->get_dimension();
const size_t num_elements = num_rows * dim;
_sample_training_vectors(vectors, num_rows, dim, _training_sample_rows_limit(min_train_rows));

if (_spool_file_writer != nullptr) {
return _append_to_spool_file(vectors, num_elements);
}

const size_t buffer_elements_limit = AnnIndexColumnWriter::chunk_size() * dim;
if (_buffered_vectors.size() + num_elements <= buffer_elements_limit) {
_buffered_vectors.insert(_buffered_vectors.end(), vectors, vectors + num_elements);
return Status::OK();
}

RETURN_IF_ERROR(_spill_buffered_vectors());
return _append_to_spool_file(vectors, num_elements);
}

size_t AnnIndexColumnWriter::_training_sample_rows_limit(Int64 min_train_rows) const {
DCHECK(min_train_rows > 0);
const Int64 bounded_rows =
std::min<Int64>(AnnIndexColumnWriter::chunk_size(),
config::ann_index_build_max_train_rows);
return cast_set<size_t>(std::max<Int64>(min_train_rows, bounded_rows));
}

void AnnIndexColumnWriter::_sample_training_vectors(const float* vectors, size_t num_rows,
size_t dim, size_t sample_rows_limit) {
DCHECK(vectors != nullptr);
DCHECK(num_rows > 0);
DCHECK(dim > 0);
DCHECK(sample_rows_limit > 0);
DCHECK(_training_sample.size() % dim == 0);

for (size_t row = 0; row < num_rows; ++row) {
const float* vector = vectors + row * dim;
++_training_sample_seen_rows;
const size_t sample_rows = _training_sample.size() / dim;
if (sample_rows < sample_rows_limit) {
_training_sample.insert(_training_sample.end(), vector, vector + dim);
continue;
}

std::uniform_int_distribution<uint64_t> distribution(0, _training_sample_seen_rows - 1);
const uint64_t selected = distribution(_training_sample_rng);
if (selected < sample_rows_limit) {
float* dst = _training_sample.data() + cast_set<size_t>(selected) * dim;
std::copy(vector, vector + dim, dst);
}
}
}

Status AnnIndexColumnWriter::_append_to_spool_file(const float* vectors, size_t num_elements) {
const size_t bytes = num_elements * sizeof(float);
return _spool_file_writer->append(Slice(reinterpret_cast<const uint8_t*>(vectors), bytes));
}

Status AnnIndexColumnWriter::_spill_buffered_vectors() {
DCHECK(_spool_file_writer == nullptr);
DORIS_CHECK(ExecEnv::GetInstance()->get_tmp_file_dirs() != nullptr);
_spool_file_path = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir() /
fmt::format("ann_index_build_{}.spool", UniqueId::gen_uid().to_string());
io::FileWriterOptions opts;
opts.sync_file_data = false;
RETURN_IF_ERROR(
io::global_local_filesystem()->create_file(_spool_file_path, &_spool_file_writer,
&opts));
if (!_buffered_vectors.empty()) {
RETURN_IF_ERROR(_append_to_spool_file(_buffered_vectors.data(), _buffered_vectors.size()));
_buffered_vectors.clear();
}
return Status::OK();
}

Status AnnIndexColumnWriter::_flush_spool_writer() {
if (_spool_file_writer == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(_spool_file_writer->close());
_spool_file_writer.reset();
return Status::OK();
}

Status AnnIndexColumnWriter::_train_and_add(Int64 min_train_rows) {
if (_total_rows < min_train_rows) {
LOG_INFO(
"Total data size {} is less than minimum {} rows required for ANN index training. "
"Skipping index building for this segment.",
_total_rows, min_train_rows);
RETURN_IF_ERROR(_flush_spool_writer());
return _index_file_writer->delete_index(_index_meta);
}

DCHECK(_training_sample.size() % _vector_index->get_dimension() == 0);
const Int64 train_rows =
cast_set<Int64>(_training_sample.size() / _vector_index->get_dimension());
DORIS_CHECK(train_rows >= min_train_rows);
RETURN_IF_ERROR(_vector_index->train(train_rows, _training_sample.data()));
_training_sample.clear();
if (_spool_file_writer == nullptr) {
RETURN_IF_ERROR(_add_buffered_vectors());
} else {
DCHECK(_float_array.size() % _vector_index->get_dimension() == 0);

Int64 num_rows = _float_array.size() / _vector_index->get_dimension();

if (num_rows >= min_train_rows) {
RETURN_IF_ERROR(_vector_index->train(num_rows, _float_array.data()));
RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data()));
_float_array.clear();
return _vector_index->save(_dir.get());
} else {
// It happens to have not enough data to train.
// If we have data to add before, we still need to save the index.
if (_need_save_index) {
// For IVF indexes, adding remaining vectors without training is acceptable
// because the quantizer was already trained on previous batches. These vectors
// are simply added to the nearest clusters without retraining.
RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data()));
_float_array.clear();
return _vector_index->save(_dir.get());
} else {
// Not enough data to train and no data added before.
// Means this is a very small segment, we can skip the index building.
// We need to delete the directory entry from index_file_writer to avoid
// writing an empty/invalid index file which causes "IndexInput read past EOF" error.
LOG_INFO(
"Remaining data size {} is less than minimum {} rows required for ANN "
"index "
"training. Skipping index building for this segment.",
num_rows, min_train_rows);
_float_array.clear();
return _index_file_writer->delete_index(_index_meta);
}
RETURN_IF_ERROR(_flush_spool_writer());
RETURN_IF_ERROR(_add_spooled_vectors());
}
return _vector_index->save(_dir.get());
}

Status AnnIndexColumnWriter::_add_buffered_vectors() {
const size_t dim = _vector_index->get_dimension();
DCHECK(_buffered_vectors.size() % dim == 0);
if (_buffered_vectors.empty()) {
return Status::OK();
}
RETURN_IF_ERROR(_vector_index->add(cast_set<Int64>(_buffered_vectors.size() / dim),
_buffered_vectors.data()));
_buffered_vectors.clear();
return Status::OK();
}

Status AnnIndexColumnWriter::_add_spooled_vectors() {
DCHECK(!_spool_file_path.empty());
io::FileReaderSPtr reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_spool_file_path, &reader));

const size_t dim = _vector_index->get_dimension();
const size_t chunk_elements = AnnIndexColumnWriter::chunk_size() * dim;
_training_sample.resize(chunk_elements);
const size_t buffer_bytes = chunk_elements * sizeof(float);
size_t offset = 0;
while (offset < reader->size()) {
const size_t bytes_to_read = std::min(buffer_bytes, reader->size() - offset);
DCHECK(bytes_to_read % sizeof(float) == 0);
size_t bytes_read = 0;
RETURN_IF_ERROR(reader->read_at(
offset, Slice(reinterpret_cast<uint8_t*>(_training_sample.data()), bytes_to_read),
&bytes_read));
if (bytes_read != bytes_to_read) {
return Status::IOError(
"Failed to read ANN index build spool file {}, expect {} bytes, "
"got {} bytes",
_spool_file_path.native(), bytes_to_read, bytes_read);
}
DCHECK((bytes_read / sizeof(float)) % dim == 0);
RETURN_IF_ERROR(_vector_index->add(cast_set<Int64>(bytes_read / sizeof(float) / dim),
_training_sample.data()));
offset += bytes_read;
}
RETURN_IF_ERROR(reader->close());
_training_sample.clear();
return Status::OK();
}

void AnnIndexColumnWriter::_delete_spool_file() {
if (_spool_file_writer != nullptr) {
Status st = _spool_file_writer->close();
if (!st.ok()) {
LOG(WARNING) << "Failed to close ANN index build spool file "
<< _spool_file_path.native() << ": " << st;
}
_spool_file_writer.reset();
}
if (!_spool_file_path.empty()) {
Status st = io::global_local_filesystem()->delete_file(_spool_file_path);
if (!st.ok()) {
LOG(WARNING) << "Failed to delete ANN index build spool file "
<< _spool_file_path.native() << ": " << st;
}
_spool_file_path.clear();
}
}
} // namespace doris::segment_v2
Loading
Loading