diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..4ed10b4 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,91 @@ +name: Rust Tests + +on: + push: + branches: + - main + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + PROTOC_VERSION: 33.3 + ARCH: linux-x86_64 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name == 'pull_request' && github.head_ref || github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + +jobs: + test: + name: Test + runs-on: ubuntu-latest + if: github.event_name == 'push' || !github.event.pull_request.draft + steps: + - name: Cache protoc + id: cache-protoc + uses: actions/cache@v4 + with: + path: ~/.local/protoc + key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.arch }} + + - name: Install protoc (if not cached) + if: steps.cache-protoc.outputs.cache-hit != 'true' + run: | + set -euxo pipefail + mkdir -p ~/.local/protoc + curl -L -o /tmp/protoc.zip \ + "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-${ARCH}.zip" + unzip -q /tmp/protoc.zip -d ~/.local/protoc + + - name: Add protoc to PATH + run: echo "$HOME/.local/protoc/bin" >> "$GITHUB_PATH" + + - uses: actions/checkout@v4 + with: + lfs: true + - uses: dsherret/rust-toolchain-file@v1 + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test --all-features + + fmt: + name: Rustfmt + runs-on: ubuntu-latest + if: github.event_name == 'push' || !github.event.pull_request.draft + steps: + - uses: actions/checkout@v4 + - uses: dsherret/rust-toolchain-file@v1 + - run: cargo fmt --all -- --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + if: github.event_name == 'push' || !github.event.pull_request.draft + steps: + - name: Cache protoc + id: cache-protoc + uses: actions/cache@v4 + with: + path: ~/.local/protoc + key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.arch }} + + - name: Install protoc (if not cached) + if: steps.cache-protoc.outputs.cache-hit != 'true' + run: | + set -euxo pipefail + mkdir -p ~/.local/protoc + curl -L -o /tmp/protoc.zip \ + "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-${ARCH}.zip" + unzip -q /tmp/protoc.zip -d ~/.local/protoc + + - name: Add protoc to PATH + run: echo "$HOME/.local/protoc/bin" >> "$GITHUB_PATH" + + - uses: actions/checkout@v4 + - name: Fetch LFS files + run: git lfs fetch --all && git lfs checkout + - uses: dsherret/rust-toolchain-file@v1 + - uses: Swatinem/rust-cache@v2 + - run: cargo clippy --all-targets --all-features -- -D clippy::correctness -D clippy::suspicious -D perf diff --git a/src/cli_state.rs b/src/cli_state.rs index c3a3da0..6c0aa98 100644 --- a/src/cli_state.rs +++ b/src/cli_state.rs @@ -26,11 +26,9 @@ pub struct ChunkConfig { impl CliState { /// Load CLI state from a YAML or JSON file pub fn load(path: &Path) -> Result { - let content = - std::fs::read_to_string(path).context("Failed to read CLI state file")?; + let content = std::fs::read_to_string(path).context("Failed to read CLI state file")?; - serde_yaml::from_str(&content) - .context("Failed to parse CLI state file") + serde_yaml::from_str(&content).context("Failed to parse CLI state file") } /// Convert chunk configs to Chunk types grouped by dataset @@ -42,16 +40,12 @@ impl CliState { let chunks: Vec = chunk_configs .iter() .map(|cc| { - Chunk::new( - dataset.clone(), - cc.id.clone(), - cc.size, - cc.files.clone(), + Chunk::new(dataset.clone(), cc.id.clone(), cc.size, cc.files.clone()).map( + |mut chunk| { + chunk.summary = cc.summary.clone(); + chunk + }, ) - .map(|mut chunk| { - chunk.summary = cc.summary.clone(); - chunk - }) }) .collect::>>()?; diff --git a/src/clickhouse.rs b/src/clickhouse.rs index f45b612..f26c2b2 100644 --- a/src/clickhouse.rs +++ b/src/clickhouse.rs @@ -97,7 +97,7 @@ impl ClickhouseClient { if row.stored_bytes < stale_threshold { status = WorkerStatus::Stale; } - if !parsed_version.as_ref().is_some_and(|ver| ver >= min_version) { + if parsed_version.as_ref().is_none_or(|ver| ver < min_version) { status = WorkerStatus::UnsupportedVersion; } if row.timestamp < inactive_threshold { @@ -222,8 +222,8 @@ impl From for ChunkRow { id: chunk.id, size: chunk.size as u64, files: chunk.files.join(","), - last_block_hash: last_block_hash, - last_block_timestamp: last_block_timestamp, + last_block_hash, + last_block_timestamp, } } } @@ -277,7 +277,7 @@ mod test { .await .expect("Cannot store chunks"); - let datasets = vec![dataset.to_string()]; + let datasets = [dataset.to_string()]; let chunks_of_dataset = client .get_existing_chunks(datasets.iter().map(|d| d.as_str())) .await diff --git a/src/controller.rs b/src/controller.rs index a0e8a4d..13efbea 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -42,10 +42,7 @@ impl Controller { } /// Load workers from a static configuration (CLI mode) - pub fn load_workers_from_config( - self, - workers: Vec, - ) -> WithWorkers { + pub fn load_workers_from_config(self, workers: Vec) -> WithWorkers { tracing::info!("Loaded {} workers from config", workers.len()); WithWorkers { diff --git a/src/main.rs b/src/main.rs index d02f86f..c34a2ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Done"); Ok(()) } - + async fn run_prod_mode(args: &cli::Args, config: cli::Config) -> anyhow::Result { let db = clickhouse::ClickhouseClient::new(&args.clickhouse) .await @@ -70,8 +70,8 @@ async fn run_cli_mode(args: &cli::Args, config: cli::Config) -> anyhow::Result, capacity: u64, min_replication: u16, - max_replication: u16 + max_replication: u16, ) -> Result, ReplicationError> { - tracing::debug!("{:?} {:?} {:?} {:?}", - size_by_weight.iter().collect::>(), - capacity, - min_replication, - max_replication + tracing::debug!( + "{:?} {:?} {:?} {:?}", + size_by_weight.iter().collect::>(), + capacity, + min_replication, + max_replication ); - + let total_size: u64 = size_by_weight.values().sum(); if total_size * min_replication as u64 > capacity { return Err(ReplicationError::NotEnoughCapacity); @@ -71,7 +72,7 @@ pub enum ReplicationError { #[test] fn test_replication() { - // Just setting this temporarily. + // Just setting this temporarily. let max_replication_factor = 1200; let size_by_weight: BTreeMap<_, _> = [(1, 4), (2, 1), (6, 1), (12, 1)].into_iter().collect(); assert!(matches!( @@ -119,7 +120,8 @@ fn test_replication() { (2400, [100, 200, 600, 1200]), ] { let replication_factors = - calc_replication_factors(size_by_weight.clone(), capacity, 2, max_replication_factor).unwrap(); + calc_replication_factors(size_by_weight.clone(), capacity, 2, max_replication_factor) + .unwrap(); assert_eq!( replication_factors.values().copied().collect::>(), expected, diff --git a/src/scheduling.rs b/src/scheduling.rs index 4da8a7e..ab581b8 100644 --- a/src/scheduling.rs +++ b/src/scheduling.rs @@ -326,7 +326,7 @@ fn assign_chunks( for &(_, worker_index) in candidates { if let Some(min_ver) = chunk.minimum_worker_version { let worker_ver = &workers[worker_index as usize].version; - if !worker_ver.as_ref().is_some_and(|v| v >= min_ver) { + if worker_ver.as_ref().is_none_or(|v| v < min_ver) { continue; } } diff --git a/src/storage.rs b/src/storage.rs index 5b53881..1b65485 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -25,7 +25,7 @@ impl S3Storage { let s3_config = aws_sdk_s3::config::Builder::from(sdk_config) .force_path_style(true) .build(); - + let client = s3::Client::from_conf(s3_config); Self { client } } diff --git a/src/tests/scheduling.rs b/src/tests/scheduling.rs index 5d0fd3a..8729f56 100644 --- a/src/tests/scheduling.rs +++ b/src/tests/scheduling.rs @@ -70,7 +70,7 @@ fn test_scheduling_uniform() { let mut sizes: Vec = vec![0; workers.len()]; for (worker_index, chunk_indexes) in assignment.worker_chunks.into_values().enumerate() { for chunk_index in chunk_indexes { - sizes[worker_index as usize] += chunks[chunk_index as usize].size as u64; + sizes[worker_index] += chunks[chunk_index as usize].size as u64; } } let stats = Stats::new(sizes.iter().copied()); @@ -261,7 +261,7 @@ fn test_minimum_worker_version_filtering() { // Versioned chunks must only be assigned to eligible workers for (worker_id, chunk_indexes) in &assignment.worker_chunks { if !eligible_ids.contains(worker_id) { - let has_versioned = chunk_indexes.iter().any(|&i| (i as u32) < N_VERSIONED); + let has_versioned = chunk_indexes.iter().any(|&i| i < N_VERSIONED); assert!( !has_versioned, "Ineligible worker {:?} was assigned version-restricted chunks", @@ -275,7 +275,7 @@ fn test_minimum_worker_version_filtering() { let versioned_count = assignment .worker_chunks .get(worker_id) - .map(|idxs| idxs.iter().filter(|&&i| (i as u32) < N_VERSIONED).count()) + .map(|idxs| idxs.iter().filter(|&&i| i < N_VERSIONED).count()) .unwrap_or(0); assert!( versioned_count > 0, @@ -347,8 +347,8 @@ fn test_minimum_worker_version_unrestricted_spill() { let mut map: BTreeMap> = BTreeMap::new(); for (&worker_id, chunk_indexes) in &assignment.worker_chunks { for &idx in chunk_indexes { - if idx as u32 >= N_VERSIONED { - map.entry(idx as u32).or_default().insert(worker_id); + if idx >= N_VERSIONED { + map.entry(idx).or_default().insert(worker_id); } } } @@ -360,7 +360,7 @@ fn test_minimum_worker_version_unrestricted_spill() { let mut changed_replicas: u64 = 0; let mut total_replicas: u64 = 0; for i in N_VERSIONED..N_CHUNKS { - let idx = i as u32; + let idx = i; let workers_a = restricted_map.get(&idx).cloned().unwrap_or_default(); let workers_b = baseline_map.get(&idx).cloned().unwrap_or_default(); let changed = workers_a.symmetric_difference(&workers_b).count() as u64 / 2; @@ -562,6 +562,7 @@ fn test_minimum_worker_version_eligible_capacity_exceeded() { /// all workers have upgraded. Compares: /// - Schedule with minimum_worker_version set and all workers eligible /// - Schedule with minimum_worker_version removed (None) +/// /// With all workers eligible, the version check is a no-op. The only difference /// is the sort order, but since s=0.95 leaves ample headroom, no worker hits /// capacity — so processing order doesn't affect which worker each chunk lands on. @@ -610,7 +611,8 @@ fn test_minimum_worker_version_no_reassignment_on_removal() { total_removed == 0 && total_added == 0, "Expected zero churn when removing restriction with all workers upgraded, \ but got removed = {}, added = {}", - total_removed, total_added, + total_removed, + total_added, ); } diff --git a/src/tests/utils.rs b/src/tests/utils.rs index 2e49ac5..8bb29ed 100644 --- a/src/tests/utils.rs +++ b/src/tests/utils.rs @@ -84,14 +84,14 @@ pub fn compare_intersection( let removed_chunks = chunks1.difference(&chunks2).copied().collect_vec(); let added_chunks = chunks2.difference(&chunks1).copied().collect_vec(); - let removed = calculate_size(&chunks, removed_chunks.iter().copied()); - let added = calculate_size(&chunks, added_chunks.iter().copied()); - let before = calculate_size(&chunks, chunks1.iter().copied()); - let after = calculate_size(&chunks, chunks2.iter().copied()); - result.removed.insert(worker_id.clone(), removed); - result.added.insert(worker_id.clone(), added); - result.before.insert(worker_id.clone(), before); - result.after.insert(worker_id.clone(), after); + let removed = calculate_size(chunks, removed_chunks.iter().copied()); + let added = calculate_size(chunks, added_chunks.iter().copied()); + let before = calculate_size(chunks, chunks1.iter().copied()); + let after = calculate_size(chunks, chunks2.iter().copied()); + result.removed.insert(*worker_id, removed); + result.added.insert(*worker_id, added); + result.before.insert(*worker_id, before); + result.after.insert(*worker_id, after); } result diff --git a/src/types/assignment.rs b/src/types/assignment.rs index 4b36f13..c0859de 100644 --- a/src/types/assignment.rs +++ b/src/types/assignment.rs @@ -53,13 +53,13 @@ impl Assignment { .min(); let min_bytes_per_worker = config.worker_stale_bytes; - if let Some(min) = min { - if min < min_bytes_per_worker { - tracing::warn!( - "Some workers have less than the minimum required storage: {min} < {min_bytes_per_worker}" - ); - crate::metrics::failure("min_assignment"); - } + if let Some(min) = min + && min < min_bytes_per_worker + { + tracing::warn!( + "Some workers have less than the minimum required storage: {min} < {min_bytes_per_worker}" + ); + crate::metrics::failure("min_assignment"); } crate::metrics::report_replication_factors(config.datasets.iter().map(|(ds, segments)| { diff --git a/src/upload.rs b/src/upload.rs index 1378d46..d2712ab 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -24,7 +24,7 @@ impl Uploader { let s3_config = aws_sdk_s3::config::Builder::from(sdk_config) .force_path_style(true) .build(); - + let client = s3::Client::from_conf(s3_config); let time = Utc::now(); crate::metrics::ASSIGNMENT_TIMESTAMP.set(time.timestamp());