diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7b6db1636234aa..1f86e8093109ae 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1757,12 +1757,19 @@ DEFINE_String(ann_index_ivf_list_cache_limit, "70%"); // Stale sweep time for ANN index IVF list cache in seconds. 3600s is 1 hour. DEFINE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec, "3600"); -// Chunk size for ANN/vector index building per training/adding batch +// Row count for ANN/vector index build add batch and memory buffer before spill. // 1M By default. DEFINE_mInt64(ann_index_build_chunk_size, "1000000"); DEFINE_Validator(ann_index_build_chunk_size, [](const int64_t config) -> bool { return config > 0; }); +// Maximum row count for ANN/vector index training sample. +// The effective sample keeps at least the index-required minimum training rows. +// 1M By default. +DEFINE_mInt64(ann_index_build_max_train_rows, "1000000"); +DEFINE_Validator(ann_index_build_max_train_rows, + [](const int64_t config) -> bool { return config > 0; }); + DEFINE_mBool(enable_wal_tde, "false"); DEFINE_mBool(print_stack_when_cache_miss, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 427282a4452bc4..fb382777d8e26f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1809,8 +1809,10 @@ DECLARE_mInt32(max_segment_partial_column_cache_size); DECLARE_String(ann_index_ivf_list_cache_limit); // Stale sweep time for ANN index IVF list cache in seconds. DECLARE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec); -// Chunk size for ANN/vector index building per training/adding batch +// Row count for ANN/vector index build add batch and memory buffer before spill. DECLARE_mInt64(ann_index_build_chunk_size); +// Maximum row count for ANN/vector index training sample. +DECLARE_mInt64(ann_index_build_max_train_rows); DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction); DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction); diff --git a/be/src/storage/index/ann/ann_index_writer.cpp b/be/src/storage/index/ann/ann_index_writer.cpp index 28d348cc319a48..12382f68481455 100644 --- a/be/src/storage/index/ann/ann_index_writer.cpp +++ b/be/src/storage/index/ann/ann_index_writer.cpp @@ -17,13 +17,22 @@ #include "storage/index/ann/ann_index_writer.h" +#include + +#include #include #include #include #include "common/cast_set.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "runtime/exec_env.h" #include "storage/index/ann/faiss_ann_index.h" #include "storage/index/inverted/inverted_index_fs_directory.h" +#include "util/slice.h" +#include "util/uid_util.h" namespace doris::segment_v2 { static std::string get_or_default(const std::map& properties, @@ -39,7 +48,9 @@ AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer, const TabletIndex* index_meta) : _index_file_writer(index_file_writer), _index_meta(index_meta) {} -AnnIndexColumnWriter::~AnnIndexColumnWriter() {} +AnnIndexColumnWriter::~AnnIndexColumnWriter() { + _delete_spool_file(); +} Status AnnIndexColumnWriter::init() { Result> compound_dir = _index_file_writer->open(_index_meta); @@ -77,8 +88,14 @@ Status AnnIndexColumnWriter::init() { index_type, build_parameter.dim, metric_type, build_parameter.max_degree, build_parameter.ef_construction, quantizer); - size_t block_size = AnnIndexColumnWriter::chunk_size() * build_parameter.dim; - _float_array.reserve(block_size); + const size_t chunk_elements = AnnIndexColumnWriter::chunk_size() * build_parameter.dim; + const Int64 min_train_rows = _vector_index->get_min_train_rows(); + if (min_train_rows > 0) { + _training_sample.reserve(_training_sample_rows_limit(min_train_rows) * build_parameter.dim); + } + _buffered_vectors.reserve(chunk_elements); + _training_sample_seen_rows = 0; + _training_sample_rng.seed(0); return Status::OK(); } @@ -87,7 +104,11 @@ Status AnnIndexColumnWriter::add_values(const std::string fn, const void* values return Status::OK(); } -void AnnIndexColumnWriter::close_on_error() {} +void AnnIndexColumnWriter::close_on_error() { + _delete_spool_file(); + _training_sample.clear(); + _buffered_vectors.clear(); +} Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* value_ptr, const uint8_t* null_map, const uint8_t* offsets_ptr, @@ -109,26 +130,13 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val const float* p = reinterpret_cast(value_ptr); - const size_t full_elements = AnnIndexColumnWriter::chunk_size() * dim; - size_t remaining_elements = num_rows * dim; - size_t src_offset = 0; - while (remaining_elements > 0) { - size_t available_space = full_elements - _float_array.size(); - size_t elements_to_add = std::min(remaining_elements, available_space); - - _float_array.insert(_float_array.end(), p + src_offset, p + src_offset + elements_to_add); - src_offset += elements_to_add; - remaining_elements -= elements_to_add; - - if (_float_array.size() == full_elements) { - RETURN_IF_ERROR( - _vector_index->train(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - RETURN_IF_ERROR( - _vector_index->add(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - _float_array.clear(); - _need_save_index = true; - } + const Int64 min_train_rows = _vector_index->get_min_train_rows(); + if (min_train_rows == 0) { + RETURN_IF_ERROR(_append_vectors_no_train(p, num_rows)); + } else { + RETURN_IF_ERROR(_append_vectors_need_train(p, num_rows, min_train_rows)); } + _total_rows += cast_set(num_rows); return Status::OK(); } @@ -146,54 +154,198 @@ int64_t AnnIndexColumnWriter::size() const { } Status AnnIndexColumnWriter::finish() { - Int64 min_train_rows = _vector_index->get_min_train_rows(); - - // Check if we have enough rows to train the index - // train/add the remaining data - if (_float_array.empty()) { - if (_need_save_index) { - return _vector_index->save(_dir.get()); - } else { - // No data was added at all. This can happen if the segment has 0 rows - // or all rows were filtered out. We need to delete the directory entry - // to avoid writing an empty/invalid index file. - LOG_INFO("No data to train/add for ANN index. Skipping index building."); - return _index_file_writer->delete_index(_index_meta); + if (_total_rows == 0) { + LOG_INFO("No data to train/add for ANN index. Skipping index building."); + Status st = _index_file_writer->delete_index(_index_meta); + _delete_spool_file(); + return st; + } + + const Int64 min_train_rows = _vector_index->get_min_train_rows(); + Status st = + min_train_rows == 0 ? _vector_index->save(_dir.get()) : _train_and_add(min_train_rows); + _delete_spool_file(); + return st; +} + +Status AnnIndexColumnWriter::_append_vectors_no_train(const float* vectors, size_t num_rows) { + DCHECK(vectors != nullptr); + DCHECK(num_rows > 0); + return _vector_index->add(cast_set(num_rows), vectors); +} + +Status AnnIndexColumnWriter::_append_vectors_need_train(const float* vectors, size_t num_rows, + Int64 min_train_rows) { + DCHECK(vectors != nullptr); + DCHECK(num_rows > 0); + + const size_t dim = _vector_index->get_dimension(); + const size_t num_elements = num_rows * dim; + _sample_training_vectors(vectors, num_rows, dim, _training_sample_rows_limit(min_train_rows)); + + if (_spool_file_writer != nullptr) { + return _append_to_spool_file(vectors, num_elements); + } + + const size_t buffer_elements_limit = AnnIndexColumnWriter::chunk_size() * dim; + if (_buffered_vectors.size() + num_elements <= buffer_elements_limit) { + _buffered_vectors.insert(_buffered_vectors.end(), vectors, vectors + num_elements); + return Status::OK(); + } + + RETURN_IF_ERROR(_spill_buffered_vectors()); + return _append_to_spool_file(vectors, num_elements); +} + +size_t AnnIndexColumnWriter::_training_sample_rows_limit(Int64 min_train_rows) const { + DCHECK(min_train_rows > 0); + const Int64 bounded_rows = std::min(AnnIndexColumnWriter::chunk_size(), + config::ann_index_build_max_train_rows); + return cast_set(std::max(min_train_rows, bounded_rows)); +} + +void AnnIndexColumnWriter::_sample_training_vectors(const float* vectors, size_t num_rows, + size_t dim, size_t sample_rows_limit) { + DCHECK(vectors != nullptr); + DCHECK(num_rows > 0); + DCHECK(dim > 0); + DCHECK(sample_rows_limit > 0); + DCHECK(_training_sample.size() % dim == 0); + + for (size_t row = 0; row < num_rows; ++row) { + const float* vector = vectors + row * dim; + ++_training_sample_seen_rows; + const size_t sample_rows = _training_sample.size() / dim; + if (sample_rows < sample_rows_limit) { + _training_sample.insert(_training_sample.end(), vector, vector + dim); + continue; + } + + std::uniform_int_distribution distribution(0, _training_sample_seen_rows - 1); + const uint64_t selected = distribution(_training_sample_rng); + if (selected < sample_rows_limit) { + float* dst = _training_sample.data() + cast_set(selected) * dim; + std::copy(vector, vector + dim, dst); } + } +} + +Status AnnIndexColumnWriter::_append_to_spool_file(const float* vectors, size_t num_elements) { + const size_t bytes = num_elements * sizeof(float); + return _spool_file_writer->append(Slice(reinterpret_cast(vectors), bytes)); +} + +Status AnnIndexColumnWriter::_spill_buffered_vectors() { + DCHECK(_spool_file_writer == nullptr); + DORIS_CHECK(ExecEnv::GetInstance()->get_tmp_file_dirs() != nullptr); + _spool_file_path = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir() / + fmt::format("ann_index_build_{}.spool", UniqueId::gen_uid().to_string()); + io::FileWriterOptions opts; + opts.sync_file_data = false; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_spool_file_path, + &_spool_file_writer, &opts)); + if (!_buffered_vectors.empty()) { + RETURN_IF_ERROR(_append_to_spool_file(_buffered_vectors.data(), _buffered_vectors.size())); + _buffered_vectors.clear(); + } + return Status::OK(); +} + +Status AnnIndexColumnWriter::_flush_spool_writer() { + if (_spool_file_writer == nullptr) { + return Status::OK(); + } + RETURN_IF_ERROR(_spool_file_writer->close()); + _spool_file_writer.reset(); + return Status::OK(); +} + +Status AnnIndexColumnWriter::_train_and_add(Int64 min_train_rows) { + if (_total_rows < min_train_rows) { + LOG_INFO( + "Total data size {} is less than minimum {} rows required for ANN index training. " + "Skipping index building for this segment.", + _total_rows, min_train_rows); + RETURN_IF_ERROR(_flush_spool_writer()); + return _index_file_writer->delete_index(_index_meta); + } + + DCHECK(_training_sample.size() % _vector_index->get_dimension() == 0); + const Int64 train_rows = + cast_set(_training_sample.size() / _vector_index->get_dimension()); + DORIS_CHECK(train_rows >= min_train_rows); + RETURN_IF_ERROR(_vector_index->train(train_rows, _training_sample.data())); + _training_sample.clear(); + if (_spool_file_writer == nullptr) { + RETURN_IF_ERROR(_add_buffered_vectors()); } else { - DCHECK(_float_array.size() % _vector_index->get_dimension() == 0); - - Int64 num_rows = _float_array.size() / _vector_index->get_dimension(); - - if (num_rows >= min_train_rows) { - RETURN_IF_ERROR(_vector_index->train(num_rows, _float_array.data())); - RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); - return _vector_index->save(_dir.get()); - } else { - // It happens to have not enough data to train. - // If we have data to add before, we still need to save the index. - if (_need_save_index) { - // For IVF indexes, adding remaining vectors without training is acceptable - // because the quantizer was already trained on previous batches. These vectors - // are simply added to the nearest clusters without retraining. - RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); - return _vector_index->save(_dir.get()); - } else { - // Not enough data to train and no data added before. - // Means this is a very small segment, we can skip the index building. - // We need to delete the directory entry from index_file_writer to avoid - // writing an empty/invalid index file which causes "IndexInput read past EOF" error. - LOG_INFO( - "Remaining data size {} is less than minimum {} rows required for ANN " - "index " - "training. Skipping index building for this segment.", - num_rows, min_train_rows); - _float_array.clear(); - return _index_file_writer->delete_index(_index_meta); - } + RETURN_IF_ERROR(_flush_spool_writer()); + RETURN_IF_ERROR(_add_spooled_vectors()); + } + return _vector_index->save(_dir.get()); +} + +Status AnnIndexColumnWriter::_add_buffered_vectors() { + const size_t dim = _vector_index->get_dimension(); + DCHECK(_buffered_vectors.size() % dim == 0); + if (_buffered_vectors.empty()) { + return Status::OK(); + } + RETURN_IF_ERROR(_vector_index->add(cast_set(_buffered_vectors.size() / dim), + _buffered_vectors.data())); + _buffered_vectors.clear(); + return Status::OK(); +} + +Status AnnIndexColumnWriter::_add_spooled_vectors() { + DCHECK(!_spool_file_path.empty()); + io::FileReaderSPtr reader; + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_spool_file_path, &reader)); + + const size_t dim = _vector_index->get_dimension(); + const size_t chunk_elements = AnnIndexColumnWriter::chunk_size() * dim; + _training_sample.resize(chunk_elements); + const size_t buffer_bytes = chunk_elements * sizeof(float); + size_t offset = 0; + while (offset < reader->size()) { + const size_t bytes_to_read = std::min(buffer_bytes, reader->size() - offset); + DCHECK(bytes_to_read % sizeof(float) == 0); + size_t bytes_read = 0; + RETURN_IF_ERROR(reader->read_at( + offset, Slice(reinterpret_cast(_training_sample.data()), bytes_to_read), + &bytes_read)); + if (bytes_read != bytes_to_read) { + return Status::IOError( + "Failed to read ANN index build spool file {}, expect {} bytes, " + "got {} bytes", + _spool_file_path.native(), bytes_to_read, bytes_read); + } + DCHECK((bytes_read / sizeof(float)) % dim == 0); + RETURN_IF_ERROR(_vector_index->add(cast_set(bytes_read / sizeof(float) / dim), + _training_sample.data())); + offset += bytes_read; + } + RETURN_IF_ERROR(reader->close()); + _training_sample.clear(); + return Status::OK(); +} + +void AnnIndexColumnWriter::_delete_spool_file() { + if (_spool_file_writer != nullptr) { + Status st = _spool_file_writer->close(); + if (!st.ok()) { + LOG(WARNING) << "Failed to close ANN index build spool file " + << _spool_file_path.native() << ": " << st; + } + _spool_file_writer.reset(); + } + if (!_spool_file_path.empty()) { + Status st = io::global_local_filesystem()->delete_file(_spool_file_path); + if (!st.ok()) { + LOG(WARNING) << "Failed to delete ANN index build spool file " + << _spool_file_path.native() << ": " << st; } + _spool_file_path.clear(); } } } // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/ann_index_writer.h b/be/src/storage/index/ann/ann_index_writer.h index 7b7e63f8574439..80dff8582d7ebd 100644 --- a/be/src/storage/index/ann/ann_index_writer.h +++ b/be/src/storage/index/ann/ann_index_writer.h @@ -24,11 +24,14 @@ #include #include +#include #include #include #include "common/config.h" #include "core/pod_array.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/path.h" #include "storage/index/ann/ann_index.h" #include "storage/index/index_file_writer.h" #include "storage/index/index_writer.h" @@ -71,16 +74,40 @@ class AnnIndexColumnWriter : public IndexColumnWriter { Status finish() override; private: + Status _append_vectors_need_train(const float* vectors, size_t num_rows, Int64 min_train_rows); + Status _append_vectors_no_train(const float* vectors, size_t num_rows); + size_t _training_sample_rows_limit(Int64 min_train_rows) const; + void _sample_training_vectors(const float* vectors, size_t num_rows, size_t dim, + size_t sample_rows_limit); + Status _append_to_spool_file(const float* vectors, size_t num_elements); + Status _spill_buffered_vectors(); + Status _flush_spool_writer(); + Status _train_and_add(Int64 min_train_rows); + Status _add_buffered_vectors(); + Status _add_spooled_vectors(); + void _delete_spool_file(); + +#ifdef BE_TEST + friend class TestAnnIndexColumnWriter; +#endif + // VectorIndex shoule be managed by some cache. // VectorIndex should be weak shared by AnnIndexWriter and VectorIndexReader // This should be a weak_ptr std::shared_ptr _vector_index; - // _float_array is used to buffer the float data before training/adding to vector index - // if we dont do this, the performance(recall) will be very poor when adding small number of vectors one by one - PODArray _float_array; + // _training_sample keeps a bounded reservoir sample for training. Full vectors are spooled + // separately so FAISS is trained once before any vector is added. + PODArray _training_sample; + uint64_t _training_sample_seen_rows = 0; + std::mt19937_64 _training_sample_rng {0}; + // _buffered_vectors keeps small training-required segments in memory. Larger segments spill + // to _spool_file_path before finish(). + PODArray _buffered_vectors; + io::Path _spool_file_path; + io::FileWriterPtr _spool_file_writer; + int64_t _total_rows = 0; IndexFileWriter* _index_file_writer; const TabletIndex* _index_meta; std::shared_ptr _dir; - bool _need_save_index = false; }; } // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp b/be/src/storage/index/ann/faiss_ann_index.cpp index f933f3c683f940..68b06db2b9061a 100644 --- a/be/src/storage/index/ann/faiss_ann_index.cpp +++ b/be/src/storage/index/ann/faiss_ann_index.cpp @@ -501,7 +501,8 @@ Int64 FaissVectorIndex::get_min_train_rows() const { // For IVF indexes, the minimum number of training points should be at least // equal to the number of clusters (nlist). FAISS requires this for k-means clustering. Int64 ivf_min = 0; - if (_params.index_type == FaissBuildParameter::IndexType::IVF) { + if (_params.index_type == FaissBuildParameter::IndexType::IVF || + _params.index_type == FaissBuildParameter::IndexType::IVF_ON_DISK) { ivf_min = _params.ivf_nlist; } diff --git a/be/src/storage/index/ann/faiss_ann_index.h b/be/src/storage/index/ann/faiss_ann_index.h index e1f1f65a3adadb..ffc80610c14300 100644 --- a/be/src/storage/index/ann/faiss_ann_index.h +++ b/be/src/storage/index/ann/faiss_ann_index.h @@ -213,8 +213,8 @@ class FaissVectorIndex : public VectorIndex { /** * @brief Returns the minimum number of rows required for training the index. * - * For IVF index types, this returns ivf_nlist (the number of clusters). - * For HNSW, this returns 0 as it doesn't require minimum training data. + * For IVF index types, this includes ivf_nlist (the number of clusters). + * Quantized indexes may require additional training rows. * * @return Minimum number of rows required for training */ diff --git a/be/test/storage/index/ann/ann_index_writer_test.cpp b/be/test/storage/index/ann/ann_index_writer_test.cpp index bb30f9e19794af..bc02fa369480e1 100644 --- a/be/test/storage/index/ann/ann_index_writer_test.cpp +++ b/be/test/storage/index/ann/ann_index_writer_test.cpp @@ -26,10 +26,13 @@ #include #include +#include "common/config.h" +#include "storage/index/ann/faiss_ann_index.h" #include "storage/index/ann/vector_search_utils.h" #include "storage/index/index_file_writer.h" #include "storage/index/inverted/inverted_index_fs_directory.h" #include "storage/tablet/tablet_schema.h" +#include "util/defer_op.h" using namespace doris::vector_search_utils; @@ -60,7 +63,7 @@ class TestAnnIndexColumnWriter : public AnnIndexColumnWriter { : AnnIndexColumnWriter(index_file_writer, index_meta) {} void set_vector_index(std::shared_ptr index) { _vector_index = index; } - void set_need_save_index(bool value) { _need_save_index = value; } + bool has_spool_file() const { return !_spool_file_path.empty(); } }; class AnnIndexWriterTest : public ::testing::Test { @@ -427,15 +430,15 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(6, testing::_)) + .Times(2) + .WillRepeatedly(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); - // CHUNK_SIZE = 10 + // CHUNK_SIZE = 10. HNSW flat does not require training, so vectors are added directly + // during add_array_values() and no build spool file is created. const size_t dim = 4; { @@ -474,10 +477,144 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { EXPECT_TRUE(status.ok()); } + EXPECT_FALSE(writer->has_spool_file()); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + Status status = writer->finish(); EXPECT_TRUE(status.ok()); } +TEST_F(AnnIndexWriterTest, TestTrainRequiredSmallDataStaysInMemory) { + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); + + const size_t dim = 4; + const size_t num_rows = 6; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); + } + std::vector offsets; + for (size_t i = 0; i <= num_rows; ++i) { + offsets.push_back(i * dim); + } + + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); + EXPECT_TRUE(status.ok()); + EXPECT_FALSE(writer->has_spool_file()); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); + { + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(6, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(6, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + } + + status = writer->finish(); + EXPECT_TRUE(status.ok()); +} + +TEST_F(AnnIndexWriterTest, TestTrainingSampleUsesReservoirAndMaxRows) { + const int64_t old_max_train_rows = config::ann_index_build_max_train_rows; + config::ann_index_build_max_train_rows = 6; + doris::Defer restore_config { + [&] { config::ann_index_build_max_train_rows = old_max_train_rows; }}; + + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); + + const size_t dim = 4; + const size_t num_rows = 20; + std::vector vectors(num_rows * dim); + for (size_t row = 0; row < num_rows; ++row) { + for (size_t col = 0; col < dim; ++col) { + vectors[row * dim + col] = static_cast(row); + } + } + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); + } + + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); + EXPECT_TRUE(status.ok()); + EXPECT_TRUE(writer->has_spool_file()); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); + { + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(6, testing::_)) + .Times(1) + .WillOnce(testing::Invoke([&](Int64 n, const float* vec) { + EXPECT_EQ(n, 6); + bool has_row_after_initial_sample = false; + bool is_prefix_sample = true; + for (size_t row = 0; row < static_cast(n); ++row) { + const auto row_id = static_cast(vec[row * dim]); + EXPECT_LT(row_id, num_rows); + if (row_id >= static_cast(n)) { + has_row_after_initial_sample = true; + } + if (row_id != row) { + is_prefix_sample = false; + } + } + EXPECT_TRUE(has_row_after_initial_sample); + EXPECT_FALSE(is_prefix_sample); + return Status::OK(); + })); + EXPECT_CALL(*mock_index, add(10, testing::_)) + .Times(2) + .WillRepeatedly(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + } + + status = writer->finish(); + EXPECT_TRUE(status.ok()); +} + TEST_F(AnnIndexWriterTest, TestCreateFromIndexColumnWriter) { TabletSchemaSPtr tablet_schema = std::make_shared(); TabletSchemaPB tablet_schema_pb; @@ -587,11 +724,11 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); EXPECT_CALL(*mock_index, train(10, testing::_)) .Times(1) .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -615,6 +752,7 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { num_rows); EXPECT_TRUE(status.ok()); } + EXPECT_FALSE(writer->has_spool_file()); { const size_t num_rows = 6; @@ -633,6 +771,7 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { num_rows); EXPECT_TRUE(status.ok()); } + EXPECT_TRUE(writer->has_spool_file()); Status status = writer->finish(); EXPECT_TRUE(status.ok()); @@ -660,8 +799,7 @@ TEST_F(AnnIndexWriterTest, TestSkipTrainWhenRemainderLessThanNlist) { writer->set_vector_index(mock_index); // CHUNK_SIZE = 10, nlist = 5 - // Add 12 rows: first 10 will be trained/added in one batch, remaining 2 < 5 - // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save + // Add 12 rows: train once with the first chunk, then add all rows in chunks. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); EXPECT_CALL(*mock_index, train(10, testing::_)) .Times(1) @@ -689,6 +827,7 @@ TEST_F(AnnIndexWriterTest, TestSkipTrainWhenRemainderLessThanNlist) { num_rows); EXPECT_TRUE(status.ok()); } + EXPECT_FALSE(writer->has_spool_file()); // Add 2 more rows { @@ -704,6 +843,7 @@ TEST_F(AnnIndexWriterTest, TestSkipTrainWhenRemainderLessThanNlist) { num_rows); EXPECT_TRUE(status.ok()); } + EXPECT_TRUE(writer->has_spool_file()); Status status = writer->finish(); EXPECT_TRUE(status.ok()); @@ -731,15 +871,14 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { writer->set_vector_index(mock_index); // CHUNK_SIZE = 10, nlist = 3 - // Add 23 rows: 2 full chunks of 10, remaining 3 == nlist, so train remaining + // Add 23 rows: train once with the first chunk, then add all rows in chunks. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(3)); EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(2) .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); @@ -805,12 +944,11 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeSkipRemainder) { writer->set_vector_index(mock_index); // CHUNK_SIZE = 10, nlist = 4 - // Add 22 rows: 2 full chunks of 10, remaining 2 < 4 - // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save + // Add 22 rows: train once with the first chunk, then add all rows in chunks. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(4)); EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); + .Times(1) + .WillOnce(testing::Return(Status::OK())); EXPECT_CALL(*mock_index, add(10, testing::_)) .Times(2) .WillRepeatedly(testing::Return(Status::OK())); @@ -876,10 +1014,9 @@ TEST_F(AnnIndexWriterTest, TestSkipIndexWhenTotalRowsLessThanNlist) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - writer->set_need_save_index(false); // No previous training, so should skip entirely // Add only 3 rows, which is less than nlist (5) - // Since no data was trained before (_need_save_index = false), we should skip index building entirely + // Since the segment is too small to train, we should skip index building entirely // No train, add, or save should be called EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); @@ -923,18 +1060,11 @@ TEST_F(AnnIndexWriterTest, TestPQMinTrainRows) { writer->set_vector_index(mock_index); // Set up expectations: mock a very large min_train_rows threshold. - // Since we only provide 1000 vectors, which is less than 131072, training will happen in batches - // but finish() will skip saving since remaining data is insufficient + // Since we only provide 1000 vectors, finish() skips building the index. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // 1000 vectors will be processed in 100 batches of 10 vectors each - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(100) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(100) - .WillRepeatedly(testing::Return(Status::OK())); - // Since we have trained data in batches, the index will be saved even though total data is insufficient - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); const size_t dim = 4; @@ -961,6 +1091,18 @@ TEST_F(AnnIndexWriterTest, TestPQMinTrainRows) { EXPECT_TRUE(status.ok()); } +TEST_F(AnnIndexWriterTest, TestIVFOnDiskMinTrainRows) { + FaissVectorIndex index; + FaissBuildParameter params; + params.index_type = FaissBuildParameter::IndexType::IVF_ON_DISK; + params.quantizer = FaissBuildParameter::Quantizer::FLAT; + params.dim = 4; + params.ivf_nlist = 7; + + index.build(params); + EXPECT_EQ(index.get_min_train_rows(), 7); +} + TEST_F(AnnIndexWriterTest, TestSQMinTrainRows) { // Test that SQ quantizer requires sufficient training data // SQ requires at least nlist * 2 = 10 * 2 = 20 training vectors @@ -976,17 +1118,12 @@ TEST_F(AnnIndexWriterTest, TestSQMinTrainRows) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - // Set up expectations: SQ should require at least 20 training vectors - // Since we only provide 15 vectors, training will happen in batches but finish() will skip saving + // Set up expectations: SQ should require at least 20 training vectors. + // Since we only provide 15 vectors, finish() skips building the index. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(20)); - // 15 vectors will be processed in 1 batch of 10 vectors and remaining 5 vectors - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(5, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - // Since we have trained data, the index will be saved even though total data is insufficient - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); const size_t dim = 4; @@ -1029,17 +1166,20 @@ TEST_F(AnnIndexWriterTest, TestPQWithSufficientData) { // Mock min_train_rows to 131072 and provide exactly that amount. EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // Since we provide exactly 131072 vectors, they will be trained and added in chunks - // Each chunk is 10 vectors, so we expect 13107 train calls and 13107 add calls for full chunks - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(13107) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(13107) - .WillRepeatedly(testing::Return(Status::OK())); - // The remaining 2 vectors will be added without training since min_train_rows > 2 - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + // Since we provide exactly 131072 vectors, train once with all sampled rows and add in chunks. + { + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(131072, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(10, testing::_)) + .Times(13107) + .WillRepeatedly(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(2, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + } const size_t dim = 4; diff --git a/regression-test/data/ann_index_p0/ivf_pq_recall.out b/regression-test/data/ann_index_p0/ivf_pq_recall.out new file mode 100644 index 00000000000000..14aab16eedc5fa --- /dev/null +++ b/regression-test/data/ann_index_p0/ivf_pq_recall.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !row_count -- +800 + +-- !first_cluster_recall -- +20 + +-- !second_cluster_recall -- +20 diff --git a/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy b/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy new file mode 100644 index 00000000000000..a5cddfeb881e69 --- /dev/null +++ b/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("ivf_pq_recall", "nonConcurrent") { + sql "set enable_common_expr_pushdown=true;" + sql "set enable_ann_index_result_cache=false;" + sql "set ivf_nprobe=8;" + + setBeConfigTemporary([ann_index_build_chunk_size: 400]) { + sql "drop table if exists ivf_pq_recall" + sql """ + create table ivf_pq_recall ( + id int not null, + embedding array not null, + index idx_embedding (`embedding`) using ann properties( + "index_type" = "ivf", + "metric_type" = "l2_distance", + "nlist" = "8", + "dim" = "4", + "quantizer" = "pq", + "pq_m" = "2", + "pq_nbits" = "2" + ) + ) engine=olap + duplicate key(id) + distributed by hash(id) buckets 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + def formatFloat = { double value -> + String.format(java.util.Locale.ROOT, "%.3f", value) + } + def vector = { double x -> + "[${formatFloat(x)}, ${formatFloat(x * 2)}, ${formatFloat(x * 3)}, ${formatFloat(x * 4)}]" + } + def rows = [] + for (int i = 1; i <= 400; i++) { + double x = (i - 1) / 1000.0 + rows.add("(${i}, ${vector(x)})") + } + for (int i = 401; i <= 800; i++) { + double x = 1000.0 + (i - 401) / 1000.0 + rows.add("(${i}, ${vector(x)})") + } + sql "insert into ivf_pq_recall values ${rows.join(',')};" + sql "sync" + + qt_row_count "select count(*) from ivf_pq_recall;" + + qt_first_cluster_recall """ + select count(*) from ( + select id + from ivf_pq_recall + order by l2_distance_approximate(embedding, [0.0, 0.0, 0.0, 0.0]) + limit 20 + ) t + where id between 1 and 400; + """ + + qt_second_cluster_recall """ + select count(*) from ( + select id + from ivf_pq_recall + order by l2_distance_approximate(embedding, [1000.0, 2000.0, 3000.0, 4000.0]) + limit 20 + ) t + where id between 401 and 800; + """ + } +}