Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions locustdb-serialization/src/event_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::api::AnyVal;
use crate::default_reader_options;
Expand All @@ -11,10 +12,77 @@ pub struct EventBuffer {

#[derive(Default, Clone, Debug)]
pub struct TableBuffer {
pub len: u64,
pub columns: HashMap<String, ColumnBuffer>,
len: u64,
columns: HashMap<String, ColumnBuffer>,
}

impl TableBuffer {
pub fn new(columns: HashMap<String, ColumnBuffer>) -> Self {
let len = columns.values().map(|c| c.data.len()).max().unwrap_or(0) as u64;
assert!(
columns
.values()
.all(|c| c.data.len() == len as usize || matches!(c.data, ColumnData::Empty)),
"All columns must have the same length"
);
Self { len, columns }
}

pub fn len(&self) -> usize {
self.len as usize
}

pub fn insert(&mut self, column_name: String, column: ColumnBuffer) {
if self.len == 0 {
self.len = column.data.len() as u64;
} else if self.len != column.data.len() as u64 {
panic!(
"Column {} has length {} but table has length {}",
column_name,
column.data.len(),
self.len
);
}
self.columns.insert(column_name, column);
}

pub fn push_row_and_timestamp<Row: IntoIterator<Item = (String, AnyVal)>>(
&mut self,
row: Row,
) -> usize {
let time_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as f64
/ 1000.0;
let mut cols = 0;
let mut timestamp_provided = false;
for (column_name, value) in row {
self.columns
.entry(column_name.to_string())
.or_default()
.push(value, self.len);
timestamp_provided |= column_name == "timestamp";
cols += 1;
}
if !timestamp_provided {
self.columns
.entry("timestamp".to_string())
.or_default()
.push(AnyVal::Float(time_millis), self.len);
}
self.len += 1;
cols
}

pub fn columns(&self) -> impl Iterator<Item = (&String, &ColumnBuffer)> {
self.columns.iter()
}

pub fn into_columns(self) -> HashMap<String, ColumnBuffer> {
self.columns
}
}
#[derive(Default, Clone, Debug)]
pub struct ColumnBuffer {
pub data: ColumnData,
Expand Down
4 changes: 2 additions & 2 deletions src/bin/db_inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ fn main() {
);
if opts.wal > 2 {
for (name, table) in &segment.data.as_ref().tables {
println!(" Table {} has {} columns", name, table.columns.len());
println!(" Table {} has {} columns", name, table.columns().count());
if opts.wal > 3 {
for col in &table.columns {
for (col, _) in table.columns() {
println!(" {:?}", col);
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/bin/repl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct Opt {
#[structopt(long, name = "WAL_SIZE", default_value = "16777216")]
max_wal_size_bytes: u64,

/// Maximum number of WAL files before triggering compaction
#[structopt(long, name = "MAX_WAL_FILES", default_value = "1000")]
max_wal_files: usize,

/// Maximum size of partition files in bytes
#[structopt(long, name = "PART_SIZE", default_value = "8388608")]
max_partition_size_bytes: u64,
Expand Down Expand Up @@ -150,6 +154,7 @@ fn main() {
metrics_interval,
metrics_table_name,
io_threads,
max_wal_files,
} = Opt::from_args();

let options = locustdb::Options {
Expand All @@ -160,6 +165,7 @@ fn main() {
mem_lz4,
readahead: readahead * 1024 * 1024,
max_wal_size_bytes,
max_wal_files,
max_partition_size_bytes,
partition_combine_factor,
batch_size,
Expand Down
2 changes: 1 addition & 1 deletion src/bitvec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl BitVec for Vec<u8> {
}
}

impl BitVec for &'_ [u8] {
impl BitVec for [u8] {
fn is_set(&self, index: usize) -> bool {
let slot = index >> 3;
slot < self.len() && self[slot] & (1 << (index as u8 & 7)) > 0
Expand Down
77 changes: 48 additions & 29 deletions src/disk_store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct Storage {
meta_db_path: PathBuf,
tables_path: PathBuf,
meta_store: Arc<RwLock<MetaStore>>,
writer: Box<dyn BlobWriter + Send + Sync + 'static>,
writer: Arc<dyn BlobWriter + Send + Sync + 'static>,
perf_counter: Arc<PerfCounter>,

io_threadpool: Option<ThreadPool>,
Expand Down Expand Up @@ -107,16 +107,17 @@ impl Storage {
} else {
(Box::new(FileBlobWriter::new()), path.to_owned())
};
let writer = Box::new(VersionedChecksummedBlobWriter::new(writer));
let writer = Arc::new(VersionedChecksummedBlobWriter::new(writer));
let meta_db_path = path.join("meta");
let wal_dir = path.join("wal");
let tables_path = path.join("tables");
let (meta_store, wal_segments, wal_size) = Storage::recover(
writer.as_ref(),
writer.clone(),
&meta_db_path,
&wal_dir,
readonly,
perf_counter.as_ref(),
perf_counter.clone(),
io_threads,
);
let meta_store = Arc::new(RwLock::new(meta_store));
(
Expand All @@ -139,11 +140,12 @@ impl Storage {
}

fn recover(
writer: &dyn BlobWriter,
writer: Arc<dyn BlobWriter>,
meta_db_path: &Path,
wal_dir: &Path,
readonly: bool,
perf_counter: &PerfCounter,
perf_counter: Arc<PerfCounter>,
io_threads: usize,
) -> (MetaStore, Vec<WalSegment<'static>>, u64) {
let mut meta_store: MetaStore = if writer.exists(meta_db_path).unwrap() {
let data = writer.load(meta_db_path).unwrap();
Expand All @@ -153,40 +155,57 @@ impl Storage {
MetaStore::default()
};

let mut wal_segments = Vec::new();
let earliest_uncommited_wal_id = meta_store.earliest_uncommited_wal_id();
log::info!(
"Recovering from wal checkpoint {}",
earliest_uncommited_wal_id
);
let wal_files = writer.list(wal_dir).unwrap();
let num_wal_files = wal_files.len();
log::info!("Found {} wal segments", wal_files.len());
let mut wal_size = 0;

let threadpool = ThreadPool::new(io_threads.min(wal_files.len()).max(1));
let (tx, rx) = mpsc::channel();
for wal_file in wal_files {
let wal_data = writer.load(&wal_file).unwrap();
perf_counter.disk_read_wal(wal_data.len() as u64);
let wal_segment = WalSegment::deserialize(&wal_data).unwrap();
log::info!(
"Found wal segment {} with id {} and {} rows in {} tables",
wal_file.display(),
wal_segment.id,
wal_segment.data.tables.values().map(|t| t.len).sum::<u64>(),
wal_segment.data.tables.len(),
);
if wal_segment.id < earliest_uncommited_wal_id {
if readonly {
log::info!("Skipping wal segment {}", wal_file.display());
let tx = tx.clone();
let writer = writer.clone();
let perf_counter = perf_counter.clone();
threadpool.execute(move || {
let wal_data = writer.load(&wal_file).unwrap();
perf_counter.disk_read_wal(wal_data.len() as u64);
let wal_segment = WalSegment::deserialize(&wal_data).unwrap();
log::info!(
"Found wal segment {} with id {} and {} rows in {} tables",
wal_file.display(),
wal_segment.id,
wal_segment
.data
.tables
.values()
.map(|t| t.len())
.sum::<usize>(),
wal_segment.data.tables.len(),
);
tx.send((wal_file, wal_segment, wal_data.len() as u64)).unwrap();
});
}

let mut wal_size = 0;
let mut wal_segments = Vec::new();
for (path, wal_segment, size) in rx.iter().take(num_wal_files) {
if wal_segment.id < earliest_uncommited_wal_id {
if readonly {
log::info!("Skipping wal segment {}", path.display());
} else {
writer.delete(&path).unwrap();
log::info!("Deleting wal segment {}", path.display());
}
} else {
writer.delete(&wal_file).unwrap();
log::info!("Deleting wal segment {}", wal_file.display());
meta_store.register_wal_segment(wal_segment.id);
wal_size += size;
wal_segments.push(wal_segment);
}
} else {
meta_store.register_wal_segment(wal_segment.id);
wal_segments.push(wal_segment);
wal_size += wal_data.len() as u64;
}
}

wal_segments.sort_by_key(|s| s.id);

(meta_store, wal_segments, wal_size)
Expand Down
14 changes: 9 additions & 5 deletions src/engine/data_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ mod vec_data;

use ordered_float::OrderedFloat;

pub use self::types::*;
pub use self::data::*;
pub use self::vec_data::*;
pub use self::byte_slices::*;
pub use self::data::*;
pub use self::nullable_vec_data::*;
pub use self::scalar_data::*;
pub use self::types::*;
pub use self::val_rows::*;
pub use self::nullable_vec_data::*;
pub use self::vec_data::*;

#[allow(non_camel_case_types)]
pub type of64 = OrderedFloat<f64>;
pub type of64 = OrderedFloat<f64>;

pub fn vec_f64_to_vec_of64(vec: Vec<f64>) -> Vec<of64> {
unsafe { std::mem::transmute::<Vec<f64>, Vec<of64>>(vec) }
}
2 changes: 1 addition & 1 deletion src/engine/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ impl<'a> QueryExecutor<'a> {
if let Some(present) = scratchpad.try_get_null_map(output) {
print!(" null map: ");
for i in 0..cmp::min(present.len() * 8, 100) {
if (&*present).is_set(i) {
if (*present).is_set(i) {
print!("1")
} else {
print!("0")
Expand Down
4 changes: 2 additions & 2 deletions src/engine/operators/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ where
}

for i in 0..nums.len() {
if (&*present).is_set(i) {
if (*present).is_set(i) {
let g = grouping[i].cast_usize();
accumulators[g] = A::accumulate(accumulators[g], nums[i]);
accumulators_present.set(g);
Expand Down Expand Up @@ -411,7 +411,7 @@ where

let mut any_overflow = false;
for i in 0..nums.len() {
if (&*present).is_set(i) {
if (*present).is_set(i) {
let g = grouping[i].cast_usize();
let (result, overflow) = A::accumulate_checked(accumulators[g], nums[i]);
any_overflow |= overflow;
Expand Down
6 changes: 3 additions & 3 deletions src/engine/operators/binary_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl<'a, LHS, RHS, Out, Op> VecOperator<'a> for NullableCheckedBinaryOperator<LH
let mut any_overflow = false;
for (i, (l, r)) in lhs.iter().zip(rhs.iter()).enumerate() {
let (result, overflow) = Op::perform_checked(*l, *r);
any_overflow |= overflow && (&*present).is_set(i);
any_overflow |= overflow && (*present).is_set(i);
output.push(result);
}
if any_overflow { Err(QueryError::Overflow) } else { Ok(()) }
Expand Down Expand Up @@ -321,7 +321,7 @@ impl<'a, LHS, RHS, Out, Op> VecOperator<'a> for NullableCheckedBinaryVSOperator<
let mut any_overflow = false;
for (i, &l) in lhs.iter().enumerate() {
let (result, overflow) = Op::perform_checked(l, rhs);
any_overflow |= overflow && (&*present).is_set(i);
any_overflow |= overflow && (*present).is_set(i);
output.push(result);
}
if any_overflow { Err(QueryError::Overflow) } else { Ok(()) }
Expand Down Expand Up @@ -367,7 +367,7 @@ impl<'a, LHS, RHS, Out, Op> VecOperator<'a> for NullableCheckedBinarySVOperator<
let mut any_overflow = false;
for (i, &r) in rhs.iter().enumerate() {
let (result, overflow) = Op::perform_checked(lhs, r);
any_overflow |= overflow && (&*present).is_set(i);
any_overflow |= overflow && (*present).is_set(i);
output.push(result);
}
if any_overflow { Err(QueryError::Overflow) } else { Ok(()) }
Expand Down
2 changes: 1 addition & 1 deletion src/engine/operators/compact_nullable_nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl<'a, T: VecData<T> + 'a, U: GenericIntVec<U>> VecOperator<'a> for CompactNul
// Remove all unmodified entries
let mut j = 0;
for (i, &s) in select.iter().take(data.len()).enumerate() {
if s > U::zero() && (&*select_present).is_set(i) {
if s > U::zero() && (*select_present).is_set(i) {
data[j] = data[i];
if present.is_set(i) {
present.set(j);
Expand Down
2 changes: 1 addition & 1 deletion src/engine/operators/compact_with_nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl<'a, T: VecData<T> + 'a, U: GenericIntVec<U>> VecOperator<'a> for CompactWit
// Remove all unmodified entries
let mut j = 0;
for (i, &s) in select.iter().take(data.len()).enumerate() {
if s > U::zero() && (&*select_present).is_set(i) {
if s > U::zero() && (*select_present).is_set(i) {
data[j] = data[i];
j += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/engine/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
filtered.clear();
}
for i in 0..data.len() {
if filter[i] > 0 && (&*present).is_set(i) {
if filter[i] > 0 && (*present).is_set(i) {
filtered.push(data[i]);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/engine/operators/filter_nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ where
}
for (i, (d, &select)) in data.iter().zip(filter.iter()).enumerate() {
if select > 0 {
if BitVec::is_set(&&*present, i) {
if BitVec::is_set(&*present, i) {
filtered_present.set(filtered.len());
}
filtered.push(*d);
Expand Down Expand Up @@ -82,8 +82,8 @@ where
}
}
for i in 0..data.len() {
if filter[i] > 0 && (&*filter_present).is_set(i) {
if BitVec::is_set(&&*input_present, i) {
if filter[i] > 0 && (*filter_present).is_set(i) {
if BitVec::is_set(&*input_present, i) {
filtered_present.set(filtered.len());
}
filtered.push(data[i]);
Expand Down
Loading
Loading