Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Rust Tests

on:
push:
branches:
- main
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
PROTOC_VERSION: 33.3
ARCH: linux-x86_64

concurrency:
group: ${{ github.workflow }}-${{ github.event_name == 'pull_request' && github.head_ref || github.ref }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
test:
name: Test
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.draft
steps:
- name: Cache protoc
id: cache-protoc
uses: actions/cache@v4
with:
path: ~/.local/protoc
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.arch }}

- name: Install protoc (if not cached)
if: steps.cache-protoc.outputs.cache-hit != 'true'
run: |
set -euxo pipefail
mkdir -p ~/.local/protoc
curl -L -o /tmp/protoc.zip \
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-${ARCH}.zip"
unzip -q /tmp/protoc.zip -d ~/.local/protoc

- name: Add protoc to PATH
run: echo "$HOME/.local/protoc/bin" >> "$GITHUB_PATH"

- uses: actions/checkout@v4
with:
lfs: true
- uses: dsherret/rust-toolchain-file@v1
- uses: Swatinem/rust-cache@v2
- name: Run tests
run: cargo test --all-features

fmt:
name: Rustfmt
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.draft
steps:
- uses: actions/checkout@v4
- uses: dsherret/rust-toolchain-file@v1
- run: cargo fmt --all -- --check

clippy:
name: Clippy
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.draft
steps:
- name: Cache protoc
id: cache-protoc
uses: actions/cache@v4
with:
path: ~/.local/protoc
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.arch }}

- name: Install protoc (if not cached)
if: steps.cache-protoc.outputs.cache-hit != 'true'
run: |
set -euxo pipefail
mkdir -p ~/.local/protoc
curl -L -o /tmp/protoc.zip \
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-${ARCH}.zip"
unzip -q /tmp/protoc.zip -d ~/.local/protoc

- name: Add protoc to PATH
run: echo "$HOME/.local/protoc/bin" >> "$GITHUB_PATH"

- uses: actions/checkout@v4
- name: Fetch LFS files
run: git lfs fetch --all && git lfs checkout
- uses: dsherret/rust-toolchain-file@v1
- uses: Swatinem/rust-cache@v2
- run: cargo clippy --all-targets --all-features -- -D clippy::correctness -D clippy::suspicious -D perf
20 changes: 7 additions & 13 deletions src/cli_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ pub struct ChunkConfig {
impl CliState {
/// Load CLI state from a YAML or JSON file
pub fn load(path: &Path) -> Result<Self> {
let content =
std::fs::read_to_string(path).context("Failed to read CLI state file")?;
let content = std::fs::read_to_string(path).context("Failed to read CLI state file")?;

serde_yaml::from_str(&content)
.context("Failed to parse CLI state file")
serde_yaml::from_str(&content).context("Failed to parse CLI state file")
}

/// Convert chunk configs to Chunk types grouped by dataset
Expand All @@ -42,16 +40,12 @@ impl CliState {
let chunks: Vec<Chunk> = chunk_configs
.iter()
.map(|cc| {
Chunk::new(
dataset.clone(),
cc.id.clone(),
cc.size,
cc.files.clone(),
Chunk::new(dataset.clone(), cc.id.clone(), cc.size, cc.files.clone()).map(
|mut chunk| {
chunk.summary = cc.summary.clone();
chunk
},
)
.map(|mut chunk| {
chunk.summary = cc.summary.clone();
chunk
})
})
.collect::<Result<Vec<_>>>()?;

Expand Down
8 changes: 4 additions & 4 deletions src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ClickhouseClient {
if row.stored_bytes < stale_threshold {
status = WorkerStatus::Stale;
}
if !parsed_version.as_ref().is_some_and(|ver| ver >= min_version) {
if parsed_version.as_ref().is_none_or(|ver| ver < min_version) {
status = WorkerStatus::UnsupportedVersion;
}
if row.timestamp < inactive_threshold {
Expand Down Expand Up @@ -222,8 +222,8 @@ impl From<Chunk> for ChunkRow {
id: chunk.id,
size: chunk.size as u64,
files: chunk.files.join(","),
last_block_hash: last_block_hash,
last_block_timestamp: last_block_timestamp,
last_block_hash,
last_block_timestamp,
}
}
}
Expand Down Expand Up @@ -277,7 +277,7 @@ mod test {
.await
.expect("Cannot store chunks");

let datasets = vec![dataset.to_string()];
let datasets = [dataset.to_string()];
let chunks_of_dataset = client
.get_existing_chunks(datasets.iter().map(|d| d.as_str()))
.await
Expand Down
5 changes: 1 addition & 4 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ impl Controller {
}

/// Load workers from a static configuration (CLI mode)
pub fn load_workers_from_config(
self,
workers: Vec<types::Worker>,
) -> WithWorkers {
pub fn load_workers_from_config(self, workers: Vec<types::Worker>) -> WithWorkers {
tracing::info!("Loaded {} workers from config", workers.len());

WithWorkers {
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Done");
Ok(())
}

async fn run_prod_mode(args: &cli::Args, config: cli::Config) -> anyhow::Result<WithAssignment> {
let db = clickhouse::ClickhouseClient::new(&args.clickhouse)
.await
Expand All @@ -70,8 +70,8 @@ async fn run_cli_mode(args: &cli::Args, config: cli::Config) -> anyhow::Result<W
.as_ref()
.context("CLI state file required for cli mode")?;

let cli_state = cli_state::CliState::load(state_file)
.context("Failed to load CLI state file")?;
let cli_state =
cli_state::CliState::load(state_file).context("Failed to load CLI state file")?;

let known_chunks = cli_state.to_chunks()?;
let workers = cli_state.workers;
Expand Down
20 changes: 11 additions & 9 deletions src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ pub fn calc_replication_factors(
size_by_weight: BTreeMap<ChunkWeight, u64>,
capacity: u64,
min_replication: u16,
max_replication: u16
max_replication: u16,
) -> Result<BTreeMap<ChunkWeight, ReplicationFactor>, ReplicationError> {
tracing::debug!("{:?} {:?} {:?} {:?}",
size_by_weight.iter().collect::<Vec<_>>(),
capacity,
min_replication,
max_replication
tracing::debug!(
"{:?} {:?} {:?} {:?}",
size_by_weight.iter().collect::<Vec<_>>(),
capacity,
min_replication,
max_replication
);

let total_size: u64 = size_by_weight.values().sum();
if total_size * min_replication as u64 > capacity {
return Err(ReplicationError::NotEnoughCapacity);
Expand Down Expand Up @@ -71,7 +72,7 @@ pub enum ReplicationError {

#[test]
fn test_replication() {
// Just setting this temporarily.
// Just setting this temporarily.
let max_replication_factor = 1200;
let size_by_weight: BTreeMap<_, _> = [(1, 4), (2, 1), (6, 1), (12, 1)].into_iter().collect();
assert!(matches!(
Expand Down Expand Up @@ -119,7 +120,8 @@ fn test_replication() {
(2400, [100, 200, 600, 1200]),
] {
let replication_factors =
calc_replication_factors(size_by_weight.clone(), capacity, 2, max_replication_factor).unwrap();
calc_replication_factors(size_by_weight.clone(), capacity, 2, max_replication_factor)
.unwrap();
assert_eq!(
replication_factors.values().copied().collect::<Vec<_>>(),
expected,
Expand Down
2 changes: 1 addition & 1 deletion src/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ fn assign_chunks(
for &(_, worker_index) in candidates {
if let Some(min_ver) = chunk.minimum_worker_version {
let worker_ver = &workers[worker_index as usize].version;
if !worker_ver.as_ref().is_some_and(|v| v >= min_ver) {
if worker_ver.as_ref().is_none_or(|v| v < min_ver) {
continue;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl S3Storage {
let s3_config = aws_sdk_s3::config::Builder::from(sdk_config)
.force_path_style(true)
.build();

let client = s3::Client::from_conf(s3_config);
Self { client }
}
Expand Down
16 changes: 9 additions & 7 deletions src/tests/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn test_scheduling_uniform() {
let mut sizes: Vec<u64> = vec![0; workers.len()];
for (worker_index, chunk_indexes) in assignment.worker_chunks.into_values().enumerate() {
for chunk_index in chunk_indexes {
sizes[worker_index as usize] += chunks[chunk_index as usize].size as u64;
sizes[worker_index] += chunks[chunk_index as usize].size as u64;
}
}
let stats = Stats::new(sizes.iter().copied());
Expand Down Expand Up @@ -261,7 +261,7 @@ fn test_minimum_worker_version_filtering() {
// Versioned chunks must only be assigned to eligible workers
for (worker_id, chunk_indexes) in &assignment.worker_chunks {
if !eligible_ids.contains(worker_id) {
let has_versioned = chunk_indexes.iter().any(|&i| (i as u32) < N_VERSIONED);
let has_versioned = chunk_indexes.iter().any(|&i| i < N_VERSIONED);
assert!(
!has_versioned,
"Ineligible worker {:?} was assigned version-restricted chunks",
Expand All @@ -275,7 +275,7 @@ fn test_minimum_worker_version_filtering() {
let versioned_count = assignment
.worker_chunks
.get(worker_id)
.map(|idxs| idxs.iter().filter(|&&i| (i as u32) < N_VERSIONED).count())
.map(|idxs| idxs.iter().filter(|&&i| i < N_VERSIONED).count())
.unwrap_or(0);
assert!(
versioned_count > 0,
Expand Down Expand Up @@ -347,8 +347,8 @@ fn test_minimum_worker_version_unrestricted_spill() {
let mut map: BTreeMap<u32, std::collections::BTreeSet<PeerId>> = BTreeMap::new();
for (&worker_id, chunk_indexes) in &assignment.worker_chunks {
for &idx in chunk_indexes {
if idx as u32 >= N_VERSIONED {
map.entry(idx as u32).or_default().insert(worker_id);
if idx >= N_VERSIONED {
map.entry(idx).or_default().insert(worker_id);
}
}
}
Expand All @@ -360,7 +360,7 @@ fn test_minimum_worker_version_unrestricted_spill() {
let mut changed_replicas: u64 = 0;
let mut total_replicas: u64 = 0;
for i in N_VERSIONED..N_CHUNKS {
let idx = i as u32;
let idx = i;
let workers_a = restricted_map.get(&idx).cloned().unwrap_or_default();
let workers_b = baseline_map.get(&idx).cloned().unwrap_or_default();
let changed = workers_a.symmetric_difference(&workers_b).count() as u64 / 2;
Expand Down Expand Up @@ -562,6 +562,7 @@ fn test_minimum_worker_version_eligible_capacity_exceeded() {
/// all workers have upgraded. Compares:
/// - Schedule with minimum_worker_version set and all workers eligible
/// - Schedule with minimum_worker_version removed (None)
///
/// With all workers eligible, the version check is a no-op. The only difference
/// is the sort order, but since s=0.95 leaves ample headroom, no worker hits
/// capacity — so processing order doesn't affect which worker each chunk lands on.
Expand Down Expand Up @@ -610,7 +611,8 @@ fn test_minimum_worker_version_no_reassignment_on_removal() {
total_removed == 0 && total_added == 0,
"Expected zero churn when removing restriction with all workers upgraded, \
but got removed = {}, added = {}",
total_removed, total_added,
total_removed,
total_added,
);
}

Expand Down
16 changes: 8 additions & 8 deletions src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ pub fn compare_intersection(
let removed_chunks = chunks1.difference(&chunks2).copied().collect_vec();
let added_chunks = chunks2.difference(&chunks1).copied().collect_vec();

let removed = calculate_size(&chunks, removed_chunks.iter().copied());
let added = calculate_size(&chunks, added_chunks.iter().copied());
let before = calculate_size(&chunks, chunks1.iter().copied());
let after = calculate_size(&chunks, chunks2.iter().copied());
result.removed.insert(worker_id.clone(), removed);
result.added.insert(worker_id.clone(), added);
result.before.insert(worker_id.clone(), before);
result.after.insert(worker_id.clone(), after);
let removed = calculate_size(chunks, removed_chunks.iter().copied());
let added = calculate_size(chunks, added_chunks.iter().copied());
let before = calculate_size(chunks, chunks1.iter().copied());
let after = calculate_size(chunks, chunks2.iter().copied());
result.removed.insert(*worker_id, removed);
result.added.insert(*worker_id, added);
result.before.insert(*worker_id, before);
result.after.insert(*worker_id, after);
}

result
Expand Down
14 changes: 7 additions & 7 deletions src/types/assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl Assignment {
.min();

let min_bytes_per_worker = config.worker_stale_bytes;
if let Some(min) = min {
if min < min_bytes_per_worker {
tracing::warn!(
"Some workers have less than the minimum required storage: {min} < {min_bytes_per_worker}"
);
crate::metrics::failure("min_assignment");
}
if let Some(min) = min
&& min < min_bytes_per_worker
{
tracing::warn!(
"Some workers have less than the minimum required storage: {min} < {min_bytes_per_worker}"
);
crate::metrics::failure("min_assignment");
}

crate::metrics::report_replication_factors(config.datasets.iter().map(|(ds, segments)| {
Expand Down
2 changes: 1 addition & 1 deletion src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Uploader {
let s3_config = aws_sdk_s3::config::Builder::from(sdk_config)
.force_path_style(true)
.build();

let client = s3::Client::from_conf(s3_config);
let time = Utc::now();
crate::metrics::ASSIGNMENT_TIMESTAMP.set(time.timestamp());
Expand Down
Loading