Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The doc mapping defines how a document and the fields it contains are stored and
| `tag_fields` | Collection of fields* explicitly defined in `field_mappings` whose values will be stored as part of the `tags` metadata. Allowed types are: `text` (with raw tokenizer), `i64` and `u64`. [Learn more about tags](../overview/concepts/querying.md#tag-pruning). | `[]` |
| `store_source` | Whether or not the original JSON document is stored or not in the index. | `false` |
| `timestamp_field` | Timestamp field* used for sharding documents in splits. The field has to be of type `datetime`. [Learn more about time sharding](./../overview/architecture.md). | `None` |
| `indexation_time_field` | Field with that will hold the indexation time of the document. This field is populated during indexation. The field has to be of type `datetime`. | `None` |
| `partition_key` | If set, quickwit will route documents into different splits depending on the field name declared as the `partition_key`. | `null` |
| `max_num_partitions` | Limits the number of splits created through partitioning. (See [Partitioning](../overview/concepts/querying.md#partitioning)) | `200` |
| `index_field_presence` | `exists` queries are enabled automatically for fast fields. To enable it for all other fields set this parameter to `true`. Enabling it can have a significant CPU-cost on indexing. | false |
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ impl crate::TestableForRegression for IndexConfig {
],
timestamp_field: Some("timestamp".to_string()),
secondary_timestamp_field: None,
indexation_time_field: None,
tag_fields: BTreeSet::from_iter(["tenant_id".to_string(), "log_level".to_string()]),
partition_key: Some("tenant_id".to_string()),
max_num_partitions: NonZeroU32::new(100).unwrap(),
Expand Down
37 changes: 37 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapper/doc_mapper_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct DocMapper {
timestamp_field_path: Option<Vec<String>>,
/// Secondary timestamp field name.
secondary_timestamp_field_name: Option<String>,
/// Indexation time field name.
indexation_time_field_name: Option<String>,
/// Root node of the field mapping tree.
/// See [`MappingNode`].
field_mappings: MappingNode,
Expand Down Expand Up @@ -128,6 +130,31 @@ fn validate_timestamp_field(
Ok(())
}

fn validate_indexation_time_field(
indexation_field_path: &str,
mapping_root_node: &MappingNode,
) -> anyhow::Result<()> {
if indexation_field_path.starts_with('.') || indexation_field_path.starts_with("\\.") {
bail!("indexation_time field `{indexation_field_path}` should not start with a `.`");
}
if indexation_field_path.ends_with('.') {
bail!("indexation_time field `{indexation_field_path}` should not end with a `.`");
}
let Some(indexation_time_field_type) =
mapping_root_node.find_field_mapping_type(indexation_field_path)
else {
bail!("could not find indexation_time field `{indexation_field_path}` in field mappings");
};
if let FieldMappingType::DateTime(_, cardinality) = &indexation_time_field_type {
if cardinality != &Cardinality::SingleValued {
bail!("indexation_time field `{indexation_field_path}` should be single-valued");
}
} else {
bail!("indexation_time field `{indexation_field_path}` should be a datetime field");
}
Ok(())
}

impl From<DocMapper> for DocMapperBuilder {
fn from(default_doc_mapper: DocMapper) -> Self {
let partition_key_str = default_doc_mapper.partition_key.to_string();
Expand All @@ -142,6 +169,7 @@ impl From<DocMapper> for DocMapperBuilder {
field_mappings: default_doc_mapper.field_mappings.into(),
timestamp_field: default_doc_mapper.timestamp_field_name,
secondary_timestamp_field: default_doc_mapper.secondary_timestamp_field_name,
indexation_time_field: default_doc_mapper.indexation_time_field_name,
tag_fields: default_doc_mapper.tag_field_names,
partition_key: partition_key_opt,
max_num_partitions: default_doc_mapper.max_num_partitions,
Expand Down Expand Up @@ -203,6 +231,9 @@ impl TryFrom<DocMapperBuilder> for DocMapper {
} else {
None
};
if let Some(indexation_time_field_name) = &doc_mapping.indexation_time_field {
validate_indexation_time_field(indexation_time_field_name, &field_mappings)?;
}
let schema = schema_builder.build();

let tokenizer_manager = create_default_quickwit_tokenizer_manager();
Expand Down Expand Up @@ -293,6 +324,7 @@ impl TryFrom<DocMapperBuilder> for DocMapper {
timestamp_field_name: doc_mapping.timestamp_field,
timestamp_field_path,
secondary_timestamp_field_name: doc_mapping.secondary_timestamp_field,
indexation_time_field_name: doc_mapping.indexation_time_field,
field_mappings,
concatenate_dynamic_fields,
Comment thread
Darkheir marked this conversation as resolved.
tag_field_names,
Expand Down Expand Up @@ -681,6 +713,11 @@ impl DocMapper {
self.secondary_timestamp_field_name.as_deref()
}

/// Returns the indexation time field name.
pub fn indexation_time_field_name(&self) -> Option<&str> {
self.indexation_time_field_name.as_deref()
}

/// Returns the tag `NameField`s on the current schema.
/// Returns an error if a tag field is not found in this schema.
pub fn tag_named_fields(&self) -> anyhow::Result<Vec<NamedField>> {
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ pub struct DocMapping {
#[serde(skip_serializing_if = "Option::is_none")]
pub secondary_timestamp_field: Option<String>,

/// Declares the field which will contain the indexation time for the document.
Comment thread
Darkheir marked this conversation as resolved.
/// This field is automatically populated by the indexer
/// with the time at which the document is indexed.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub indexation_time_field: Option<String>,

/// Declares the low cardinality fields for which the values ​​are recorded directly in the
/// splits metadata.
#[schema(value_type = Vec<String>)]
Expand Down Expand Up @@ -207,6 +214,7 @@ mod tests {
],
timestamp_field: Some("timestamp".to_string()),
secondary_timestamp_field: None,
indexation_time_field: None,
tag_fields: BTreeSet::from_iter(["level".to_string()]),
partition_key: Some("tenant_id".to_string()),
max_num_partitions: NonZeroU32::new(100).unwrap(),
Expand Down
183 changes: 181 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tantivy::schema::{Field, Schema, Value};
use tantivy::store::{Compressor, ZstdCompressor};
use tantivy::tokenizer::TokenizerManager;
use tantivy::{DateTime, IndexBuilder, IndexSettings};
use time::OffsetDateTime;
use tokio::runtime::Handle;
use tokio::sync::Semaphore;
use tracing::{Span, info, info_span, warn};
Expand Down Expand Up @@ -99,6 +100,7 @@ struct IndexerState {
max_num_partitions: NonZeroU32,
index_settings: IndexSettings,
cooperative_indexing_opt: Option<CooperativeIndexingCycle>,
indexation_time_field_opt: Option<Field>,
}

impl IndexerState {
Expand Down Expand Up @@ -300,7 +302,15 @@ impl IndexerState {
.context("batch delta does not follow indexer checkpoint")?;
let mut memory_usage_delta: i64 = 0;
counters.num_doc_batches_in_workbench += 1;
for doc in batch.docs {
let indexation_time_opt = self
.indexation_time_field_opt
.map(|_| DateTime::from_utc(OffsetDateTime::now_utc()));
for mut doc in batch.docs {
if let (Some(indexation_time), Some(indexation_time_field)) =
(indexation_time_opt, self.indexation_time_field_opt)
{
doc.doc.add_date(indexation_time_field, indexation_time);
}
let ProcessedDoc {
doc,
timestamp_opt,
Expand Down Expand Up @@ -589,6 +599,17 @@ impl Indexer {
cooperative_indexing_permits,
)
});
let indexation_time_field_opt =
doc_mapper
.indexation_time_field_name()
.and_then(|name| match schema.get_field(name) {
Ok(field) => Some(field),
Err(_) => {
warn!("failed to find indexation time field '{}' in schema", name);
None
}
});

Self {
indexer_state: IndexerState {
pipeline_id,
Expand All @@ -604,6 +625,7 @@ impl Indexer {
index_settings,
max_num_partitions: doc_mapper.max_num_partitions(),
cooperative_indexing_opt,
indexation_time_field_opt,
},
index_serializer_mailbox,
indexing_workbench_opt: None,
Expand Down Expand Up @@ -743,7 +765,7 @@ mod tests {
EmptyResponse, LastDeleteOpstampResponse, MockMetastoreService,
};
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid};
use tantivy::{DateTime, doc};
use tantivy::{DateTime, DocAddress, ReloadPolicy, TantivyDocument, doc};

use super::*;
use crate::actors::indexer::{IndexerCounters, record_timestamp};
Expand Down Expand Up @@ -1851,4 +1873,161 @@ mod tests {
universe.assert_quit().await;
Ok(())
}

fn doc_mapper_with_indexation_time() -> DocMapper {
const JSON_CONFIG_VALUE: &str = r#"
{
"store_source": true,
"index_field_presence": true,
"default_search_fields": ["body"],
"timestamp_field": "timestamp",
"indexation_time_field": "indexed_at",
"field_mappings": [
{
"name": "timestamp",
"type": "datetime",
"output_format": "unix_timestamp_secs",
"fast": true
},
{
"name": "body",
"type": "text",
"stored": true
},
{
"name": "indexed_at",
"type": "datetime",
"output_format": "unix_timestamp_secs",
"fast": true,
"stored": true
}
]
}"#;
serde_json::from_str::<DocMapper>(JSON_CONFIG_VALUE).unwrap()
}

#[tokio::test]
async fn test_indexer_sets_indexation_time() -> anyhow::Result<()> {
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
node_id: NodeId::from("test-node"),
pipeline_uid: PipelineUid::default(),
};
let doc_mapper = Arc::new(doc_mapper_with_indexation_time());
let last_delete_opstamp = 10;
let schema = doc_mapper.schema();
let body_field = schema.get_field("body").unwrap();
let timestamp_field = schema.get_field("timestamp").unwrap();
let indexed_at_field = schema.get_field("indexed_at").unwrap();
let indexing_directory = TempDirectory::for_test();
let mut indexing_settings = IndexingSettings::for_test();
indexing_settings.split_num_docs_target = 3;
let universe = Universe::with_accelerated_time();
let (index_serializer_mailbox, index_serializer_inbox) = universe.create_test_mailbox();
let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_last_delete_opstamp()
.times(1)
.returning(move |delete_opstamp_request| {
assert_eq!(delete_opstamp_request.index_uid(), &index_uid);
Ok(LastDeleteOpstampResponse::new(last_delete_opstamp))
});
mock_metastore.expect_publish_splits().never();
let indexer = Indexer::new(
pipeline_id,
doc_mapper,
MetastoreServiceClient::from_mock(mock_metastore),
indexing_directory,
indexing_settings,
None,
index_serializer_mailbox,
);
let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer);

// Send 3 docs in a single batch so they all share the same indexation timestamp
// (the timestamp is sampled once per batch in `index_batch`).
indexer_mailbox
.send_message(ProcessedDocBatch::new(
vec![
ProcessedDoc {
doc: doc!(
body_field => "document 1",
timestamp_field => DateTime::from_timestamp_secs(1_662_000_001),
),
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_001)),
partition: 1,
num_bytes: 30,
},
ProcessedDoc {
doc: doc!(
body_field => "document 2",
timestamp_field => DateTime::from_timestamp_secs(1_662_000_002),
),
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_002)),
partition: 1,
num_bytes: 30,
},
ProcessedDoc {
doc: doc!(
body_field => "document 3",
timestamp_field => DateTime::from_timestamp_secs(1_662_000_003),
),
timestamp_opt: Some(DateTime::from_timestamp_secs(1_662_000_003)),
partition: 1,
num_bytes: 30,
},
],
SourceCheckpointDelta::from_range(0..3),
false,
))
.await?;

indexer_handle.process_pending_and_observe().await;
let messages: Vec<IndexedSplitBatchBuilder> = index_serializer_inbox.drain_for_test_typed();
assert_eq!(messages.len(), 1);
let batch = messages.into_iter().next().unwrap();
assert_eq!(batch.commit_trigger, CommitTrigger::NumDocsLimit);
assert_eq!(batch.splits.len(), 1);
assert_eq!(batch.splits[0].split_attrs.num_docs, 3);

// Finalize the split and open the tantivy index to verify the `indexed_at` field.
let indexed_split = batch.splits.into_iter().next().unwrap().finalize()?;
let reader = indexed_split
.index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

// Collect every `indexed_at` value present in the split.
let mut indexed_at_values: Vec<DateTime> = Vec::new();
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
for doc_id in 0..segment_reader.max_doc() {
let doc_address = DocAddress::new(segment_ord as u32, doc_id);
let doc: TantivyDocument = searcher.doc(doc_address)?;
let indexed_at = doc
.get_first(indexed_at_field)
.and_then(|val| val.as_datetime())
.expect("indexed_at field must be set on every indexed document");
indexed_at_values.push(indexed_at);
}
}

// All 3 documents must have been stamped with the indexation time.
assert_eq!(indexed_at_values.len(), 3);
// Because the timestamp is captured once for the whole batch, every document
// in the batch must carry exactly the same `indexed_at` value.
let first = indexed_at_values[0];
for val in &indexed_at_values {
assert_eq!(
*val, first,
"all documents in the same batch must share the same indexed_at timestamp"
);
}

universe.assert_quit().await;
Ok(())
}
}
Loading