Skip to content

Commit c8d269e

Browse files
Add validation for numeric config fields
1 parent e26702e commit c8d269e

3 files changed

Lines changed: 168 additions & 16 deletions

File tree

crates/fluss/src/client/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ impl FlussConnection {
4343
pub async fn new(arg: Config) -> Result<Self> {
4444
arg.validate_security()
4545
.map_err(|msg| Error::IllegalArgument { message: msg })?;
46-
arg.validate_scanner_fetch()
46+
arg.validate_scanner()
47+
.map_err(|msg| Error::IllegalArgument { message: msg })?;
48+
arg.validate_writer()
4749
.map_err(|msg| Error::IllegalArgument { message: msg })?;
4850

4951
let timeout = Duration::from_millis(arg.connect_timeout_ms);

crates/fluss/src/client/table/remote_log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ impl RemoteLogDownloader {
778778
let fetcher = Arc::new(ProductionFetcher {
779779
credentials_rx,
780780
local_log_dir: Arc::new(local_log_dir),
781-
remote_log_read_concurrency: remote_log_read_concurrency.max(1),
781+
remote_log_read_concurrency,
782782
});
783783

784784
Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)

crates/fluss/src/config.rs

Lines changed: 164 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,16 @@ impl Config {
324324
}
325325
Ok(())
326326
}
327-
pub fn validate_scanner_fetch(&self) -> Result<(), String> {
327+
pub fn validate_scanner(&self) -> Result<(), String> {
328+
if self.scanner_remote_log_prefetch_num == 0 {
329+
return Err("scanner_remote_log_prefetch_num must be > 0".to_string());
330+
}
331+
if self.scanner_remote_log_read_concurrency == 0 {
332+
return Err("scanner_remote_log_read_concurrency must be > 0".to_string());
333+
}
334+
if self.remote_file_download_thread_num == 0 {
335+
return Err("remote_file_download_thread_num must be > 0".to_string());
336+
}
328337
if self.scanner_log_fetch_min_bytes <= 0 {
329338
return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
330339
}
@@ -350,6 +359,57 @@ impl Config {
350359
}
351360
Ok(())
352361
}
362+
363+
pub fn validate_writer(&self) -> Result<(), String> {
364+
if self.writer_request_max_size <= 0 {
365+
return Err("writer_request_max_size must be > 0".to_string());
366+
}
367+
if self.writer_batch_size <= 0 {
368+
return Err("writer_batch_size must be > 0".to_string());
369+
}
370+
if self.writer_batch_timeout_ms < 0 {
371+
return Err("writer_batch_timeout_ms must be >= 0".to_string());
372+
}
373+
if self.writer_max_inflight_requests_per_bucket == 0 {
374+
return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string());
375+
}
376+
if self.writer_buffer_memory_size == 0 {
377+
return Err("writer_buffer_memory_size must be > 0".to_string());
378+
}
379+
if self.writer_batch_size > self.writer_request_max_size {
380+
return Err("writer_batch_size must be <= writer_request_max_size".to_string());
381+
}
382+
if self.writer_batch_size as usize > self.writer_buffer_memory_size {
383+
return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string());
384+
}
385+
// idempotence checks
386+
if !self.writer_enable_idempotence {
387+
return Ok(());
388+
}
389+
let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
390+
if !acks_is_all {
391+
return Err(format!(
392+
"Idempotent writes require acks='all' (-1), but got acks='{}'",
393+
self.writer_acks
394+
));
395+
}
396+
if self.writer_retries <= 0 {
397+
return Err(format!(
398+
"Idempotent writes require retries > 0, but got retries={}",
399+
self.writer_retries
400+
));
401+
}
402+
if self.writer_max_inflight_requests_per_bucket
403+
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
404+
{
405+
return Err(format!(
406+
"Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
407+
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
408+
self.writer_max_inflight_requests_per_bucket
409+
));
410+
}
411+
Ok(())
412+
}
353413
}
354414

355415
#[cfg(test)]
@@ -419,13 +479,38 @@ mod tests {
419479
};
420480
assert!(config.validate_security().is_err());
421481
}
482+
422483
#[test]
423-
fn test_scanner_fetch_defaults_valid() {
484+
fn test_scanner_defaults_valid() {
424485
let config = Config::default();
425-
assert!(config.validate_scanner_fetch().is_ok());
426-
assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
427-
assert_eq!(config.scanner_log_fetch_min_bytes, 1);
428-
assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
486+
assert!(config.validate_scanner().is_ok());
487+
}
488+
489+
#[test]
490+
fn test_scanner_remote_log_prefetch_num_zero() {
491+
let config = Config {
492+
scanner_remote_log_prefetch_num: 0,
493+
..Config::default()
494+
};
495+
assert!(config.validate_scanner().is_err());
496+
}
497+
498+
#[test]
499+
fn test_scanner_remote_log_read_concurrency_zero() {
500+
let config = Config {
501+
scanner_remote_log_read_concurrency: 0,
502+
..Config::default()
503+
};
504+
assert!(config.validate_scanner().is_err());
505+
}
506+
507+
#[test]
508+
fn test_remote_file_download_thread_num_zero() {
509+
let config = Config {
510+
remote_file_download_thread_num: 0,
511+
..Config::default()
512+
};
513+
assert!(config.validate_scanner().is_err());
429514
}
430515

431516
#[test]
@@ -435,7 +520,7 @@ mod tests {
435520
scanner_log_fetch_max_bytes: 1,
436521
..Config::default()
437522
};
438-
assert!(config.validate_scanner_fetch().is_err());
523+
assert!(config.validate_scanner().is_err());
439524
}
440525

441526
#[test]
@@ -444,13 +529,78 @@ mod tests {
444529
scanner_log_fetch_wait_max_time_ms: -1,
445530
..Config::default()
446531
};
447-
assert!(config.validate_scanner_fetch().is_err());
532+
assert!(config.validate_scanner().is_err());
448533
}
449534

450535
#[test]
451-
fn test_idempotence_default_is_valid() {
536+
fn test_writer_defaults_valid() {
452537
let config = Config::default();
453-
assert!(config.validate_idempotence().is_ok());
538+
assert!(config.validate_writer().is_ok());
539+
}
540+
541+
#[test]
542+
fn test_writer_request_max_size_zero() {
543+
let config = Config {
544+
writer_request_max_size: 0,
545+
..Config::default()
546+
};
547+
assert!(config.validate_writer().is_err());
548+
}
549+
550+
#[test]
551+
fn test_writer_batch_size_zero() {
552+
let config = Config {
553+
writer_batch_size: 0,
554+
..Config::default()
555+
};
556+
assert!(config.validate_writer().is_err());
557+
}
558+
559+
#[test]
560+
fn test_writer_batch_timeout_negative() {
561+
let config = Config {
562+
writer_batch_timeout_ms: -1,
563+
..Config::default()
564+
};
565+
assert!(config.validate_writer().is_err());
566+
}
567+
568+
#[test]
569+
fn test_writer_max_inflight_requests_per_bucket_zero() {
570+
let config = Config {
571+
writer_max_inflight_requests_per_bucket: 0,
572+
..Config::default()
573+
};
574+
assert!(config.validate_writer().is_err());
575+
}
576+
577+
#[test]
578+
fn test_writer_buffer_memory_size_zero() {
579+
let config = Config {
580+
writer_buffer_memory_size: 0,
581+
..Config::default()
582+
};
583+
assert!(config.validate_writer().is_err());
584+
}
585+
586+
#[test]
587+
fn test_writer_batch_size_exceeds_request_max_size() {
588+
let config = Config {
589+
writer_batch_size: 20 * 1024 * 1024,
590+
writer_request_max_size: 10 * 1024 * 1024,
591+
..Config::default()
592+
};
593+
assert!(config.validate_writer().is_err());
594+
}
595+
596+
#[test]
597+
fn test_writer_batch_size_exceeds_buffer_memory_size() {
598+
let config = Config {
599+
writer_batch_size: 128 * 1024 * 1024,
600+
writer_buffer_memory_size: 64 * 1024 * 1024,
601+
..Config::default()
602+
};
603+
assert!(config.validate_writer().is_err());
454604
}
455605

456606
#[test]
@@ -462,7 +612,7 @@ mod tests {
462612
writer_max_inflight_requests_per_bucket: 100,
463613
..Config::default()
464614
};
465-
assert!(config.validate_idempotence().is_ok());
615+
assert!(config.validate_writer().is_ok());
466616
}
467617

468618
#[test]
@@ -472,7 +622,7 @@ mod tests {
472622
writer_acks: "1".to_string(),
473623
..Config::default()
474624
};
475-
assert!(config.validate_idempotence().is_err());
625+
assert!(config.validate_writer().is_err());
476626
}
477627

478628
#[test]
@@ -482,7 +632,7 @@ mod tests {
482632
writer_retries: 0,
483633
..Config::default()
484634
};
485-
assert!(config.validate_idempotence().is_err());
635+
assert!(config.validate_writer().is_err());
486636
}
487637

488638
#[test]
@@ -492,6 +642,6 @@ mod tests {
492642
writer_max_inflight_requests_per_bucket: 10,
493643
..Config::default()
494644
};
495-
assert!(config.validate_idempotence().is_err());
645+
assert!(config.validate_writer().is_err());
496646
}
497647
}

0 commit comments

Comments
 (0)