From 3b8c74e495106ad92770cc00a7821fbbd3f658d0 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 30 Mar 2026 13:06:25 -0600 Subject: [PATCH 1/4] input_chunk: use projected fs growth for storage.total_limit_size eviction Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 315 +++++++++++++++++++++++++++++------------- 1 file changed, 221 insertions(+), 94 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 0955f7affa3..cc41230b6e5 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -248,6 +248,19 @@ static int flb_input_chunk_drop_task_route( ssize_t *dropped_record_count, ssize_t *dropped_byte_count); +static int flb_input_chunk_has_other_routes( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins); + +static int flb_input_chunk_prefers_physical_delete( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins, + int release_scope); + +static size_t flb_input_chunk_get_projected_write_size( + struct flb_input_chunk *ic, + size_t append_size); + static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk) { @@ -300,6 +313,8 @@ static int flb_input_chunk_release_space( ssize_t *required_space, int release_scope) { + int pass; + int pass_limit; struct mk_list *input_chunk_iterator_tmp; struct mk_list *input_chunk_iterator; struct flb_router *router; @@ -313,122 +328,140 @@ static int flb_input_chunk_release_space( released_space = 0; - mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, - &input_plugin->chunks) { - old_input_chunk = mk_list_entry(input_chunk_iterator, - struct flb_input_chunk, _head); + pass_limit = 1; + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { + pass_limit = 2; + } - if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, - output_plugin->id, - input_plugin->config->router)) { - continue; - } + for (pass = 0; pass < pass_limit; pass++) { + mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp, + &input_plugin->chunks) { + old_input_chunk = mk_list_entry(input_chunk_iterator, + struct flb_input_chunk, _head); - if (flb_input_chunk_safe_delete(new_input_chunk, - old_input_chunk, - output_plugin->id) == FLB_FALSE) { - continue; - } + if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, + output_plugin->id, + input_plugin->config->router)) { + continue; + } - if (flb_input_chunk_drop_task_route(old_input_chunk->task, - output_plugin, - &dropped_record_count, - &dropped_byte_count) == FLB_FALSE) { - continue; - } + if (flb_input_chunk_safe_delete(new_input_chunk, + old_input_chunk, + output_plugin->id) == FLB_FALSE) { + continue; + } - chunk_size = flb_input_chunk_get_real_size(old_input_chunk); - chunk_released = FLB_FALSE; - chunk_destroy_flag = FLB_FALSE; + if (pass == 0 && + flb_input_chunk_prefers_physical_delete(old_input_chunk, + output_plugin, + release_scope) == FLB_FALSE) { + continue; + } - if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { - flb_routes_mask_clear_bit(old_input_chunk->routes_mask, - output_plugin->id, - input_plugin->config->router); + if (flb_input_chunk_drop_task_route(old_input_chunk->task, + output_plugin, + &dropped_record_count, + &dropped_byte_count) == FLB_FALSE) { + continue; + } - FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); - output_plugin->fs_chunks_size -= chunk_size; + chunk_size = flb_input_chunk_get_real_size(old_input_chunk); + chunk_released = FLB_FALSE; + chunk_destroy_flag = FLB_FALSE; - chunk_destroy_flag = flb_routes_mask_is_empty( - old_input_chunk->routes_mask, - input_plugin->config->router); + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { + flb_routes_mask_clear_bit(old_input_chunk->routes_mask, + output_plugin->id, + input_plugin->config->router); - chunk_released = FLB_TRUE; - } - else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { - chunk_destroy_flag = FLB_TRUE; - } + FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); + output_plugin->fs_chunks_size -= chunk_size; -#ifdef FLB_HAVE_METRICS - if (dropped_record_count < 0) { - dropped_record_count = get_input_chunk_record_count(old_input_chunk); - } + chunk_destroy_flag = flb_routes_mask_is_empty( + old_input_chunk->routes_mask, + input_plugin->config->router); - if (dropped_byte_count < 0) { - dropped_byte_count = chunk_size; - } + chunk_released = FLB_TRUE; + } + else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { + chunk_destroy_flag = FLB_TRUE; + } - if (dropped_record_count == -1) { - flb_debug("[task] error getting chunk record count : %s", - old_input_chunk->in->name); - } - else if (dropped_record_count > 0) { - cmt_counter_add(output_plugin->cmt_dropped_records, - cfl_time_now(), - dropped_record_count, - 1, (char *[]) {(char *) flb_output_name(output_plugin)}); +#ifdef FLB_HAVE_METRICS + if (dropped_record_count < 0) { + dropped_record_count = get_input_chunk_record_count(old_input_chunk); + } - if (input_plugin->config && input_plugin->config->router && - old_input_chunk->event_type == FLB_INPUT_LOGS) { - router = input_plugin->config->router; + if (dropped_byte_count < 0) { + dropped_byte_count = chunk_size; + } - cmt_counter_add(router->logs_drop_records_total, + if (dropped_record_count == -1) { + flb_debug("[task] error getting chunk record count : %s", + old_input_chunk->in->name); + } + else if (dropped_record_count > 0) { + cmt_counter_add(output_plugin->cmt_dropped_records, cfl_time_now(), - (double) dropped_record_count, - 2, - (char *[]){(char *) flb_input_name(old_input_chunk->in), - (char *) flb_output_name(output_plugin)}); + dropped_record_count, + 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + + if (input_plugin->config && input_plugin->config->router && + old_input_chunk->event_type == FLB_INPUT_LOGS) { + router = input_plugin->config->router; + + cmt_counter_add(router->logs_drop_records_total, + cfl_time_now(), + (double) dropped_record_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + + cmt_counter_add(router->logs_drop_bytes_total, + cfl_time_now(), + (double) dropped_byte_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + } - cmt_counter_add(router->logs_drop_bytes_total, - cfl_time_now(), - (double) dropped_byte_count, - 2, - (char *[]){(char *) flb_input_name(old_input_chunk->in), - (char *) flb_output_name(output_plugin)}); + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, + dropped_record_count, + output_plugin->metrics); } - - flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, - dropped_record_count, - output_plugin->metrics); - } #endif - if (chunk_destroy_flag) { - if (old_input_chunk->task != NULL) { - /* - * If the chunk is referenced by a task and task has no active route, - * we need to destroy the task as well. - */ - if (old_input_chunk->task->users == 0) { - flb_debug("[task] drop task_id %d with no active route from input plugin %s", - old_input_chunk->task->id, new_input_chunk->in->name); - flb_task_destroy(old_input_chunk->task, FLB_TRUE); + if (chunk_destroy_flag) { + if (old_input_chunk->task != NULL) { + /* + * If the chunk is referenced by a task and task has no active route, + * we need to destroy the task as well. + */ + if (old_input_chunk->task->users == 0) { + flb_debug("[task] drop task_id %d with no active route from input plugin %s", + old_input_chunk->task->id, new_input_chunk->in->name); + flb_task_destroy(old_input_chunk->task, FLB_TRUE); + + chunk_released = FLB_TRUE; + } + } + else { + flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", + flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); + + flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); chunk_released = FLB_TRUE; } } - else { - flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s", - flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name); - flb_input_chunk_destroy(old_input_chunk, FLB_TRUE); - - chunk_released = FLB_TRUE; + if (chunk_released) { + released_space += chunk_size; } - } - if (chunk_released) { - released_space += chunk_size; + if (released_space >= *required_space) { + break; + } } if (released_space >= *required_space) { @@ -611,9 +644,98 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, return FLB_TRUE; } +static int flb_input_chunk_has_other_routes(struct flb_input_chunk *ic, + struct flb_output_instance *o_ins) +{ + struct mk_list *head; + struct flb_output_instance *candidate; + + mk_list_foreach(head, &ic->in->config->outputs) { + candidate = mk_list_entry(head, struct flb_output_instance, _head); + + if (candidate == o_ins) { + continue; + } + + if (flb_routes_mask_get_bit(ic->routes_mask, + candidate->id, + ic->in->config->router) != 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int flb_input_chunk_prefers_physical_delete( + struct flb_input_chunk *ic, + struct flb_output_instance *o_ins, + int release_scope) +{ + if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) { + if (ic->task == NULL) { + return FLB_TRUE; + } + + return flb_input_chunk_is_task_safe_delete(ic->task); + } + + if (flb_input_chunk_has_other_routes(ic, o_ins) == FLB_TRUE) { + return FLB_FALSE; + } + + if (ic->task == NULL) { + return FLB_TRUE; + } + + return flb_input_chunk_is_task_safe_delete(ic->task); +} + +static size_t flb_input_chunk_get_projected_write_size( + struct flb_input_chunk *ic, + size_t append_size) +{ + size_t page_size; + size_t meta_size; + size_t logical_size; + size_t current_size; + size_t projected_size; + struct cio_chunk *chunk; + + page_size = 4096; + chunk = (struct cio_chunk *) ic->chunk; + + if (chunk != NULL && chunk->ctx != NULL && chunk->ctx->page_size > 0) { + page_size = (size_t) chunk->ctx->page_size; + } + + meta_size = (size_t) cio_meta_size(ic->chunk); + logical_size = (size_t) flb_input_chunk_get_size(ic) + meta_size + 24; + projected_size = logical_size + append_size; + + if (projected_size == 0) { + projected_size = page_size; + } + else { + projected_size = ((projected_size + page_size - 1) / page_size) * page_size; + } + + current_size = 0; + if (ic->fs_counted == FLB_TRUE) { + current_size = (size_t) flb_input_chunk_get_real_size(ic); + } + + if (projected_size > current_size) { + return projected_size - current_size; + } + + return 0; +} + int flb_input_chunk_release_space_compound( struct flb_input_chunk *new_input_chunk, struct flb_output_instance *output_plugin, + size_t required_space, size_t *local_release_requirement, int release_local_space) { @@ -625,10 +747,11 @@ int flb_input_chunk_release_space_compound( storage_backlog_instance = output_plugin->config->storage_input_plugin; - *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk); + *local_release_requirement = required_space; required_space_remainder = (ssize_t) *local_release_requirement; - if (required_space_remainder > 0) { + if (required_space_remainder > 0 && + storage_backlog_instance != NULL) { result = flb_input_chunk_release_space(new_input_chunk, storage_backlog_instance, output_plugin, @@ -717,6 +840,7 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, result = flb_input_chunk_release_space_compound( ic, o_ins, + chunk_size, &local_release_requirement, FLB_TRUE); @@ -2286,6 +2410,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, int ret; int new_chunk = FLB_FALSE; size_t out_size; + size_t placement_size; struct flb_input_chunk *ic = NULL; if (tag_len > FLB_INPUT_CHUNK_TAG_MAX) { @@ -2358,8 +2483,10 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks * (based in creation time) to get enough space for the incoming chunk. */ + placement_size = flb_input_chunk_get_projected_write_size(ic, chunk_size); + if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) - && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { + && flb_input_chunk_place_new_chunk(ic, placement_size) == 0) { /* * If the chunk is not newly created, the chunk might already have logs inside. * We cannot delete (reused) chunks here. From 0723c308b7101cece520f2d73c264d7412a9e078 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 30 Mar 2026 13:06:31 -0600 Subject: [PATCH 2/4] tests: internal: input_chunk: add storage.total_limit_size eviction regressions Signed-off-by: Eduardo Silva --- tests/internal/input_chunk.c | 205 +++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 827957604f9..d9b51c85d07 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -1,6 +1,7 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ #include +#include #include #include #include @@ -75,6 +76,51 @@ static int file_to_buf(const char *path, char **out_buf, size_t *out_size) return 0; } +static int count_chunk_files(const char *path) +{ + int total; + size_t name_len; + struct stat st; + struct dirent *entry; + DIR *dir; + char full_path[PATH_MAX]; + + total = 0; + dir = opendir(path); + if (dir == NULL) { + return 0; + } + + while ((entry = readdir(dir)) != NULL) { + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + + snprintf(full_path, sizeof(full_path) - 1, "%s/%s", path, entry->d_name); + full_path[sizeof(full_path) - 1] = '\0'; + + if (stat(full_path, &st) != 0) { + continue; + } + + if (S_ISDIR(st.st_mode)) { + total += count_chunk_files(full_path); + continue; + } + + name_len = strlen(entry->d_name); + if (name_len > 4 && + strcmp(entry->d_name + name_len - 4, ".flb") == 0) { + total++; + } + } + + closedir(dir); + + return total; +} + /* Given a target, lookup the .out file and return it content in a tail_file_lines structure */ static struct tail_file_lines *get_out_file_content(const char *target) { @@ -920,6 +966,163 @@ void flb_test_input_chunk_grouped_release_space_drop_counters(void) flb_free(storage_path); } +void flb_test_input_chunk_prefers_deletable_files_on_limit(void) +{ + int records; + int chunk_file_count; + struct flb_input_instance *i_ins; + struct flb_output_instance *o_shared; + struct flb_output_instance *o_solo; + struct mk_list *head; + struct mk_list *tmp; + struct flb_input_chunk *ic; + struct flb_task *task; + struct flb_config *cfg; + struct cio_ctx *cio; + struct mk_event_loop *evl; + struct cio_options opts = {0}; + char *root_path; + char stream_path[PATH_MAX]; + char temp_path[128]; + char buf[2048]; + size_t shared_chunk_size; + size_t solo_chunk_size; + size_t total_limit; + + snprintf(temp_path, sizeof(temp_path) - 1, + "/input-chunk-prefer-deletable-files-%i/", + getpid()); + temp_path[sizeof(temp_path) - 1] = '\0'; + + root_path = flb_test_tmpdir_cat(temp_path); + TEST_CHECK(root_path != NULL); + if (!root_path) { + return; + } + + memset(buf, 0x5A, sizeof(buf)); + + flb_init_env(); + cfg = flb_config_init(); + evl = mk_event_loop_create(256); + + TEST_CHECK(evl != NULL); + if (!evl) { + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + cfg->evl = evl; + flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL); + + i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE); + TEST_CHECK(i_ins != NULL); + if (!i_ins) { + flb_config_exit(cfg); + flb_free(root_path); + return; + } + i_ins->storage_type = CIO_STORE_FS; + + cio_options_init(&opts); + opts.root_path = root_path; + opts.log_cb = log_cb; + opts.log_level = CIO_LOG_DEBUG; + opts.flags = CIO_OPEN; + + cio = cio_create(&opts); + TEST_CHECK(cio != NULL); + if (!cio) { + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + flb_storage_input_create(cio, i_ins); + flb_input_init_all(cfg); + + snprintf(stream_path, sizeof(stream_path) - 1, + "%s/%s", root_path, i_ins->name); + stream_path[sizeof(stream_path) - 1] = '\0'; + + o_shared = flb_output_new(cfg, "http", NULL, FLB_TRUE); + o_solo = flb_output_new(cfg, "http", NULL, FLB_TRUE); + TEST_CHECK(o_shared != NULL); + TEST_CHECK(o_solo != NULL); + if (!o_shared || !o_solo) { + cio_destroy(cio); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); + return; + } + + o_shared->id = 0; + o_solo->id = 1; + + flb_output_set_property(o_shared, "match", "shared.*"); + flb_output_set_property(o_shared, "storage.total_limit_size", "10M"); + flb_output_set_property(o_solo, "match", "*"); + flb_output_set_property(o_solo, "storage.total_limit_size", "10M"); + + TEST_CHECK_(flb_router_io_set(cfg) != -1, "unable to router"); + + records = flb_mp_count(buf, sizeof(buf)); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "shared.one", 10, + buf, sizeof(buf)) == 0); + ic = mk_list_entry_last(&i_ins->chunks, struct flb_input_chunk, _head); + shared_chunk_size = flb_input_chunk_get_real_size(ic); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "solo.one", 8, + buf, sizeof(buf)) == 0); + ic = mk_list_entry_last(&i_ins->chunks, struct flb_input_chunk, _head); + solo_chunk_size = flb_input_chunk_get_real_size(ic); + + total_limit = shared_chunk_size + solo_chunk_size + (solo_chunk_size / 2); + o_solo->total_limit_size = total_limit; + + chunk_file_count = count_chunk_files(stream_path); + TEST_CHECK(chunk_file_count == 2); + + TEST_CHECK(flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, + records, "solo.two", 8, + buf, sizeof(buf)) == 0); + + chunk_file_count = count_chunk_files(stream_path); + + /* + * The oldest chunk is shared with another output and cannot be unlinked + * by only dropping a single route. When the solo output needs space, we + * should prefer the next chunk that can actually be deleted. + */ + TEST_CHECK(chunk_file_count == 2); + TEST_CHECK(mk_list_size(&i_ins->chunks) == 2); + + mk_list_foreach_safe(head, tmp, &i_ins->tasks) { + task = mk_list_entry(head, struct flb_task, _head); + flb_task_destroy(task, FLB_TRUE); + } + + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + flb_input_chunk_destroy(ic, FLB_TRUE); + } + + cio_destroy(cio); + flb_router_exit(cfg); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); + flb_free(root_path); +} + /* Test list */ TEST_LIST = { @@ -931,5 +1134,7 @@ TEST_LIST = { {"input_chunk_grouped_auto_records", flb_test_input_chunk_grouped_auto_records}, {"input_chunk_grouped_release_space_drop_counters", flb_test_input_chunk_grouped_release_space_drop_counters}, + {"input_chunk_prefers_deletable_files_on_limit", + flb_test_input_chunk_prefers_deletable_files_on_limit}, {NULL, NULL} }; From ccd4de1e74df744147009703647734922c02401b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 30 Mar 2026 13:51:59 -0600 Subject: [PATCH 3/4] input_chunk: use chunkio alloc growth for storage.total_limit_size eviction Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 52 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index cc41230b6e5..c68e73a642f 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -72,6 +72,23 @@ struct flb_input_chunk_meta_view { uint16_t routing_data_length; }; +/* + * Mirror the leading layout we need from chunkio's private cio_file backend + * so size projection can follow alloc_size/realloc_size growth without + * depending on private headers in this translation unit. + */ +struct flb_input_chunk_cio_file_view { + int fd; + int flags; + int synced; + int allocate_strategy; + size_t fs_size; + size_t data_size; + size_t page_size; + size_t alloc_size; + size_t realloc_size; +}; + static inline int input_chunk_has_magic_bytes(char *buf, int len) { unsigned char *p; @@ -698,27 +715,56 @@ static size_t flb_input_chunk_get_projected_write_size( size_t page_size; size_t meta_size; size_t logical_size; + size_t alloc_size; size_t current_size; + size_t realloc_size; size_t projected_size; struct cio_chunk *chunk; + struct flb_input_chunk_cio_file_view *chunk_file; page_size = 4096; chunk = (struct cio_chunk *) ic->chunk; + chunk_file = NULL; if (chunk != NULL && chunk->ctx != NULL && chunk->ctx->page_size > 0) { page_size = (size_t) chunk->ctx->page_size; } + if (chunk != NULL && chunk->backend != NULL && + chunk->st != NULL && chunk->st->type == CIO_STORE_FS) { + chunk_file = (struct flb_input_chunk_cio_file_view *) chunk->backend; + } + meta_size = (size_t) cio_meta_size(ic->chunk); logical_size = (size_t) flb_input_chunk_get_size(ic) + meta_size + 24; projected_size = logical_size + append_size; - if (projected_size == 0) { - projected_size = page_size; + alloc_size = 0; + realloc_size = page_size; + + if (chunk_file != NULL) { + alloc_size = chunk_file->alloc_size; + if (chunk_file->realloc_size > 0) { + realloc_size = chunk_file->realloc_size; + } + } + + if (alloc_size == 0) { + alloc_size = page_size; } - else { + + if (projected_size > alloc_size) { + projected_size = alloc_size + realloc_size; + + while (projected_size < logical_size + append_size) { + projected_size += realloc_size; + } + projected_size = ((projected_size + page_size - 1) / page_size) * page_size; } + else { + projected_size = alloc_size; + } current_size = 0; if (ic->fs_counted == FLB_TRUE) { From 732996f63849ca342ae962299d0f9761d0cc06a9 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 30 Mar 2026 14:03:36 -0600 Subject: [PATCH 4/4] input_chunk: harden storage.total_limit_size growth projection Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 45 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index c68e73a642f..a704c29bb9b 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -73,9 +73,13 @@ struct flb_input_chunk_meta_view { }; /* - * Mirror the leading layout we need from chunkio's private cio_file backend - * so size projection can follow alloc_size/realloc_size growth without - * depending on private headers in this translation unit. + * Mirror the leading layout we need from chunkio's private cio_file backend. + * + * We intentionally avoid including here because that + * header pulls additional private includes that are not available through this + * translation unit's normal include path in this build. We only need the + * stable leading allocation fields to estimate alloc_size/realloc_size growth + * for storage.total_limit_size decisions. */ struct flb_input_chunk_cio_file_view { int fd; @@ -712,6 +716,9 @@ static size_t flb_input_chunk_get_projected_write_size( struct flb_input_chunk *ic, size_t append_size) { + size_t increment_size; + size_t increments; + size_t needed_size; size_t page_size; size_t meta_size; size_t logical_size; @@ -719,6 +726,7 @@ static size_t flb_input_chunk_get_projected_write_size( size_t current_size; size_t realloc_size; size_t projected_size; + size_t target_size; struct cio_chunk *chunk; struct flb_input_chunk_cio_file_view *chunk_file; @@ -736,8 +744,16 @@ static size_t flb_input_chunk_get_projected_write_size( } meta_size = (size_t) cio_meta_size(ic->chunk); + if ((size_t) flb_input_chunk_get_size(ic) > SIZE_MAX - meta_size - 24) { + return SIZE_MAX; + } + logical_size = (size_t) flb_input_chunk_get_size(ic) + meta_size + 24; - projected_size = logical_size + append_size; + if (logical_size > SIZE_MAX - append_size) { + return SIZE_MAX; + } + target_size = logical_size + append_size; + projected_size = target_size; alloc_size = 0; realloc_size = page_size; @@ -753,11 +769,24 @@ static size_t flb_input_chunk_get_projected_write_size( alloc_size = page_size; } - if (projected_size > alloc_size) { - projected_size = alloc_size + realloc_size; + if (target_size > alloc_size) { + projected_size = alloc_size; + needed_size = target_size - projected_size; + + increments = needed_size / realloc_size; + if ((needed_size % realloc_size) != 0) { + increments++; + } + + if (increments > (SIZE_MAX - projected_size) / realloc_size) { + return SIZE_MAX; + } + + increment_size = increments * realloc_size; + projected_size += increment_size; - while (projected_size < logical_size + append_size) { - projected_size += realloc_size; + if (projected_size > SIZE_MAX - (page_size - 1)) { + return SIZE_MAX; } projected_size = ((projected_size + page_size - 1) / page_size) * page_size;