From b8268d48032094692493be73764acc1b39778030 Mon Sep 17 00:00:00 2001 From: "manthapavankumar11@gmail.com" Date: Sat, 18 Apr 2026 21:26:48 +0530 Subject: [PATCH] bulk insert data --- README.md | 50 ++++++++++++++++++++++++++ src/qql/ast_nodes.py | 10 ++++++ src/qql/cli.py | 4 +++ src/qql/executor.py | 81 ++++++++++++++++++++++++++++++++++++++++++ src/qql/lexer.py | 2 ++ src/qql/parser.py | 45 ++++++++++++++++++++++- tests/test_executor.py | 74 ++++++++++++++++++++++++++++++++++++++ tests/test_parser.py | 70 ++++++++++++++++++++++++++++++++++++ 8 files changed, 335 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c5837b..bbb9151 100644 --- a/README.md +++ b/README.md @@ -239,6 +239,56 @@ INSERT INTO COLLECTION articles VALUES {'text': 'hello world'} --- +### INSERT BULK — batch insert multiple points + +Inserts multiple documents in a single statement. Each item in the array must contain a `"text"` key. All items are embedded and upserted to Qdrant in **one batched call**, which is significantly faster than issuing one `INSERT` per record. + +If the collection does not exist yet, it is **created automatically** on the first bulk insert. + +**Syntax:** +``` +INSERT BULK INTO COLLECTION VALUES [, , ...] +INSERT BULK INTO COLLECTION VALUES [, ...] USING MODEL '' +INSERT BULK INTO COLLECTION VALUES [, ...] USING HYBRID +INSERT BULK INTO COLLECTION VALUES [, ...] USING HYBRID DENSE MODEL '' SPARSE MODEL '' +``` + +**Examples:** + +Minimal bulk insert (text only): +```sql +INSERT BULK INTO COLLECTION articles VALUES [ + {'text': 'Qdrant supports cosine similarity search'}, + {'text': 'Sparse BM25 vectors enable keyword retrieval'}, + {'text': 'Hybrid search combines dense and sparse results via RRF'} +] +``` + +Bulk insert with metadata: +```sql +INSERT BULK INTO COLLECTION articles VALUES [ + {'text': 'Attention is all you need', 'author': 'vaswani', 'year': 2017}, + {'text': 'BERT: Pre-training of deep bidirectional transformers', 'author': 'devlin', 'year': 2018}, + {'text': 'Language models are few-shot learners', 'author': 'brown', 'year': 2020} +] +``` + +Bulk insert into a hybrid collection: +```sql +INSERT BULK INTO COLLECTION articles VALUES [ + {'text': 'Dense retrieval with FAISS', 'domain': 'ir'}, + {'text': 'Sparse retrieval with BM25', 'domain': 'ir'} +] USING HYBRID +``` + +**Rules:** +- Every dict in the array must contain a `"text"` key. Missing `text` on any item raises an error with the offending index. +- An empty array `[]` raises an error. +- A UUID is auto-generated for each point — you do not provide IDs. +- Supports all the same `USING` clauses as single `INSERT`. + +--- + ### SEARCH — find similar points Performs a **semantic similarity search**: your query text is embedded with the same model used during insert, then Qdrant finds the nearest vectors by cosine distance. diff --git a/src/qql/ast_nodes.py b/src/qql/ast_nodes.py index 49eadf1..fc95611 100644 --- a/src/qql/ast_nodes.py +++ b/src/qql/ast_nodes.py @@ -129,6 +129,15 @@ class InsertStmt: sparse_model: str | None = None # sparse model; None → SparseEmbedder.DEFAULT_MODEL +@dataclass(frozen=True) +class InsertBulkStmt: + collection: str + values_list: tuple[dict[str, Any], ...] # each dict must contain "text" + model: str | None # dense model; None → use config default + hybrid: bool = False + sparse_model: str | None = None + + @dataclass(frozen=True) class CreateCollectionStmt: collection: str @@ -169,6 +178,7 @@ class DeleteStmt: # Union type for all top-level statement nodes ASTNode = ( InsertStmt + | InsertBulkStmt | CreateCollectionStmt | DropCollectionStmt | ShowCollectionsStmt diff --git a/src/qql/cli.py b/src/qql/cli.py index bf1b3fe..3b63d8e 100644 --- a/src/qql/cli.py +++ b/src/qql/cli.py @@ -28,6 +28,10 @@ Optional: [yellow]USING MODEL[/yellow] '' Optional: [yellow]USING HYBRID[/yellow] [DENSE MODEL ''] [SPARSE MODEL ''] + [yellow]INSERT BULK INTO COLLECTION[/yellow] [yellow]VALUES[/yellow] [{[yellow]'text'[/yellow]: '...', ...}, ...] + Batch insert multiple points in a single call. Each dict must contain 'text'. + Supports the same [yellow]USING[/yellow] clauses as INSERT. + [yellow]CREATE COLLECTION[/yellow] [[yellow]HYBRID[/yellow]] Create a new collection. Add HYBRID for dense+sparse BM25 vectors. Optional: [yellow]USING MODEL[/yellow] '' diff --git a/src/qql/executor.py b/src/qql/executor.py index 0144420..62059ed 100644 --- a/src/qql/executor.py +++ b/src/qql/executor.py @@ -42,6 +42,7 @@ DropCollectionStmt, FilterExpr, InExpr, + InsertBulkStmt, InsertStmt, IsEmptyExpr, IsNotEmptyExpr, @@ -77,6 +78,8 @@ def __init__(self, client: QdrantClient, config: QQLConfig) -> None: self._config = config def execute(self, node: ASTNode) -> ExecutionResult: + if isinstance(node, InsertBulkStmt): + return self._execute_insert_bulk(node) if isinstance(node, InsertStmt): return self._execute_insert(node) if isinstance(node, CreateCollectionStmt): @@ -170,6 +173,84 @@ def _execute_insert(self, node: InsertStmt) -> ExecutionResult: data={"id": point_id, "collection": node.collection}, ) + def _execute_insert_bulk(self, node: InsertBulkStmt) -> ExecutionResult: + if not node.values_list: + raise QQLRuntimeError("INSERT BULK VALUES list is empty") + for i, vals in enumerate(node.values_list): + if "text" not in vals: + raise QQLRuntimeError( + f"INSERT BULK: item at index {i} is missing required 'text' field" + ) + + # ── Hybrid bulk INSERT: dense + sparse vectors ───────────────────── + if node.hybrid: + dense_model = node.model or self._config.default_model + sparse_model_name = node.sparse_model or SparseEmbedder.DEFAULT_MODEL + dense_embedder = Embedder(dense_model) + sparse_embedder = SparseEmbedder(sparse_model_name) + + points: list[PointStruct] = [] + for vals in node.values_list: + dense_vector = dense_embedder.embed(vals["text"]) + sparse_obj = sparse_embedder.embed(vals["text"]) + sparse_vector = SparseVector( + indices=sparse_obj["indices"], values=sparse_obj["values"] + ) + point_id = str(uuid.uuid4()) + points.append( + PointStruct( + id=point_id, + vector={"dense": dense_vector, "sparse": sparse_vector}, + payload=dict(vals), + ) + ) + + if not self._client.collection_exists(node.collection): + first_dense = dense_embedder.embed(node.values_list[0]["text"]) + self._client.create_collection( + collection_name=node.collection, + vectors_config={ + "dense": VectorParams(size=len(first_dense), distance=Distance.COSINE) + }, + sparse_vectors_config={ + "sparse": SparseVectorParams(modifier=Modifier.IDF) + }, + ) + + try: + self._client.upsert(collection_name=node.collection, points=points) + except UnexpectedResponse as e: + raise QQLRuntimeError(f"Qdrant error during INSERT BULK: {e}") from e + + return ExecutionResult( + success=True, + message=f"Inserted {len(points)} points (hybrid)", + ) + + # ── Standard dense-only bulk INSERT ─────────────────────────────── + model_name = node.model or self._config.default_model + embedder = Embedder(model_name) + + points = [] + for vals in node.values_list: + vector = embedder.embed(vals["text"]) + point_id = str(uuid.uuid4()) + points.append( + PointStruct(id=point_id, vector=vector, payload=dict(vals)) + ) + + self._ensure_collection(node.collection, len(points[0].vector)) + + try: + self._client.upsert(collection_name=node.collection, points=points) + except UnexpectedResponse as e: + raise QQLRuntimeError(f"Qdrant error during INSERT BULK: {e}") from e + + return ExecutionResult( + success=True, + message=f"Inserted {len(points)} points", + ) + def _execute_create(self, node: CreateCollectionStmt) -> ExecutionResult: if self._client.collection_exists(node.collection): return ExecutionResult( diff --git a/src/qql/lexer.py b/src/qql/lexer.py index d994f25..90b705d 100644 --- a/src/qql/lexer.py +++ b/src/qql/lexer.py @@ -7,6 +7,7 @@ class TokenKind(Enum): # ── Statement keywords ──────────────────────────────────────────────── INSERT = auto() + BULK = auto() INTO = auto() COLLECTION = auto() VALUES = auto() @@ -71,6 +72,7 @@ class TokenKind(Enum): _KEYWORDS: dict[str, TokenKind] = { # Statement keywords "INSERT": TokenKind.INSERT, + "BULK": TokenKind.BULK, "INTO": TokenKind.INTO, "COLLECTION": TokenKind.COLLECTION, "VALUES": TokenKind.VALUES, diff --git a/src/qql/parser.py b/src/qql/parser.py index 4e5b340..f466a76 100644 --- a/src/qql/parser.py +++ b/src/qql/parser.py @@ -10,6 +10,7 @@ DropCollectionStmt, FilterExpr, InExpr, + InsertBulkStmt, InsertStmt, IsEmptyExpr, IsNotEmptyExpr, @@ -70,8 +71,12 @@ def parse(self) -> ASTNode: # ── Statement parsers ───────────────────────────────────────────────── - def _parse_insert(self) -> InsertStmt: + def _parse_insert(self) -> InsertStmt | InsertBulkStmt: self._expect(TokenKind.INSERT) + if self._peek().kind == TokenKind.BULK: + self._advance() # consume BULK + return self._parse_insert_bulk_body() + # ── Standard single INSERT ──────────────────────────────────────── self._expect(TokenKind.INTO) self._expect(TokenKind.COLLECTION) collection = self._parse_identifier() @@ -102,6 +107,44 @@ def _parse_insert(self) -> InsertStmt: hybrid=hybrid, sparse_model=sparse_model, ) + def _parse_insert_bulk_body(self) -> InsertBulkStmt: + self._expect(TokenKind.INTO) + self._expect(TokenKind.COLLECTION) + collection = self._parse_identifier() + self._expect(TokenKind.VALUES) + raw_list = self._parse_list() + for i, item in enumerate(raw_list): + if not isinstance(item, dict): + raise QQLSyntaxError( + f"INSERT BULK VALUES item at index {i} must be a dict, " + f"got {type(item).__name__}", + 0, + ) + values_list: tuple[dict, ...] = tuple(raw_list) + model: str | None = None + hybrid: bool = False + sparse_model: str | None = None + if self._peek().kind == TokenKind.USING: + self._advance() # consume USING + if self._peek().kind == TokenKind.HYBRID: + self._advance() # consume HYBRID + hybrid = True + while self._peek().kind in (TokenKind.DENSE, TokenKind.SPARSE): + sub = self._advance() + self._expect(TokenKind.MODEL) + m = self._expect(TokenKind.STRING).value + if sub.kind == TokenKind.DENSE: + model = m + else: + sparse_model = m + else: + self._expect(TokenKind.MODEL) + model = self._expect(TokenKind.STRING).value + return InsertBulkStmt( + collection=collection, values_list=values_list, + model=model, hybrid=hybrid, sparse_model=sparse_model, + ) + def _parse_create(self) -> CreateCollectionStmt: self._expect(TokenKind.CREATE) self._expect(TokenKind.COLLECTION) diff --git a/tests/test_executor.py b/tests/test_executor.py index 04fc0a7..d930815 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -4,6 +4,7 @@ CreateCollectionStmt, DeleteStmt, DropCollectionStmt, + InsertBulkStmt, InsertStmt, SearchStmt, SearchWith, @@ -91,6 +92,79 @@ def test_insert_raises_on_dimension_mismatch(self, executor, mock_client): executor.execute(node) +class TestInsertBulk: + def test_bulk_insert_calls_upsert_once(self, executor, mock_client): + node = InsertBulkStmt( + collection="col", + values_list=({"text": "hello"}, {"text": "world"}), + model=None, + ) + executor.execute(node) + mock_client.upsert.assert_called_once() + + def test_bulk_insert_upserts_correct_count(self, executor, mock_client): + node = InsertBulkStmt( + collection="col", + values_list=({"text": "a"}, {"text": "b"}, {"text": "c"}), + model=None, + ) + executor.execute(node) + call_args = mock_client.upsert.call_args.kwargs + assert len(call_args["points"]) == 3 + + def test_bulk_insert_creates_collection_when_missing(self, executor, mock_client): + node = InsertBulkStmt( + collection="col", + values_list=({"text": "hello"},), + model=None, + ) + executor.execute(node) + mock_client.create_collection.assert_called_once() + + def test_bulk_insert_skips_create_when_exists(self, executor, mock_client): + mock_client.collection_exists.return_value = True + mock_client.get_collection.return_value.config.params.vectors.size = 384 + node = InsertBulkStmt( + collection="col", + values_list=({"text": "hello"},), + model=None, + ) + executor.execute(node) + mock_client.create_collection.assert_not_called() + + def test_bulk_insert_raises_on_missing_text(self, executor): + node = InsertBulkStmt( + collection="col", + values_list=({"text": "ok"}, {"author": "bob"}), + model=None, + ) + with pytest.raises(QQLRuntimeError, match="index 1"): + executor.execute(node) + + def test_bulk_insert_empty_list_raises(self, executor): + node = InsertBulkStmt(collection="col", values_list=(), model=None) + with pytest.raises(QQLRuntimeError, match="empty"): + executor.execute(node) + + def test_bulk_insert_result_message_contains_count(self, executor, mock_client): + node = InsertBulkStmt( + collection="col", + values_list=({"text": "a"}, {"text": "b"}), + model=None, + ) + result = executor.execute(node) + assert result.success is True + assert "2" in result.message + assert "points" in result.message + + def test_single_insert_unaffected_by_bulk_dispatch(self, executor, mock_client): + """Ensure single INSERT still routes correctly after bulk dispatch added.""" + node = InsertStmt(collection="notes", values={"text": "hello"}, model=None) + result = executor.execute(node) + assert result.success is True + assert "Inserted 1 point" in result.message + + class TestCreate: def test_create_new_collection(self, executor, mock_client): node = CreateCollectionStmt(collection="new_col") diff --git a/tests/test_parser.py b/tests/test_parser.py index 002be79..a0dab81 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -8,6 +8,7 @@ DeleteStmt, DropCollectionStmt, InExpr, + InsertBulkStmt, InsertStmt, IsEmptyExpr, IsNotEmptyExpr, @@ -88,6 +89,75 @@ def test_missing_text_is_not_parser_error(self): assert "text" not in node.values +class TestInsertBulk: + def test_basic_bulk_insert(self): + node = parse("INSERT BULK INTO COLLECTION col VALUES [{'text': 'hello'}]") + assert isinstance(node, InsertBulkStmt) + assert node.collection == "col" + assert len(node.values_list) == 1 + assert node.values_list[0]["text"] == "hello" + + def test_bulk_insert_two_items(self): + node = parse( + "INSERT BULK INTO COLLECTION col VALUES " + "[{'text': 'first'}, {'text': 'second'}]" + ) + assert isinstance(node, InsertBulkStmt) + assert len(node.values_list) == 2 + assert node.values_list[1]["text"] == "second" + + def test_bulk_insert_preserves_metadata(self): + node = parse( + "INSERT BULK INTO COLLECTION col VALUES " + "[{'text': 'hello', 'year': 2021}, {'text': 'world', 'year': 2022}]" + ) + assert node.values_list[0]["year"] == 2021 + assert node.values_list[1]["year"] == 2022 + + def test_bulk_insert_using_model(self): + node = parse( + "INSERT BULK INTO COLLECTION col VALUES [{'text': 'a'}] " + "USING MODEL 'BAAI/bge-base-en-v1.5'" + ) + assert node.model == "BAAI/bge-base-en-v1.5" + assert node.hybrid is False + + def test_bulk_insert_using_hybrid(self): + node = parse( + "INSERT BULK INTO COLLECTION col VALUES [{'text': 'a'}] USING HYBRID" + ) + assert node.hybrid is True + assert node.model is None + + def test_bulk_insert_using_hybrid_dense_model(self): + node = parse( + "INSERT BULK INTO COLLECTION col VALUES [{'text': 'a'}] " + "USING HYBRID DENSE MODEL 'BAAI/bge-base-en-v1.5'" + ) + assert node.hybrid is True + assert node.model == "BAAI/bge-base-en-v1.5" + + def test_bulk_insert_collection_name(self): + node = parse("INSERT BULK INTO COLLECTION my_notes VALUES [{'text': 'x'}]") + assert node.collection == "my_notes" + + def test_bulk_insert_case_insensitive(self): + node = parse("insert bulk into collection col values [{'text': 'hi'}]") + assert isinstance(node, InsertBulkStmt) + + def test_bulk_insert_default_model_is_none(self): + node = parse("INSERT BULK INTO COLLECTION col VALUES [{'text': 'a'}]") + assert node.model is None + assert node.sparse_model is None + assert node.hybrid is False + + def test_single_insert_still_works_after_bulk_addition(self): + """Ensure single INSERT flow is not broken by the BULK branch.""" + node = parse("INSERT INTO COLLECTION col VALUES {'text': 'hello'}") + assert isinstance(node, InsertStmt) + assert node.values == {"text": "hello"} + + class TestCreate: def test_create_collection(self): node = parse("CREATE COLLECTION my_col")