diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp index e6a26904913500..f093ea4f04107f 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.cpp @@ -36,7 +36,6 @@ namespace doris { DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent), - batch_size(state->batch_size()), _agg_data(std::make_unique()), _child_block(Block::create_unique()), _aggregated_block(Block::create_unique()), @@ -195,8 +194,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( SCOPED_TIMER(_insert_keys_to_column_timer); if (mem_reuse) { if (_stop_emplace_flag && !out_block->empty()) { - // when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block - // need output those data firstly + // Output existing aggregated rows before switching to passthrough mode. DCHECK(_distinct_row.empty()); _distinct_row.resize(rows); std::iota(_distinct_row.begin(), _distinct_row.end(), 0); @@ -211,28 +209,10 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( in_block->replace_by_position(result_idxs[i], output_column); } } else { - DCHECK_EQ(_cache_block.rows(), 0); - // is output row > batch_size, split some to cache_block - if (out_block->rows() + _distinct_row.size() > batch_size) { - size_t split_size = batch_size - out_block->rows(); - for (int i = 0; i < key_size; ++i) { - auto output_dst = - IColumn::mutate(std::move(out_block->get_by_position(i).column)); - key_columns[i]->append_data_by_selector(output_dst, _distinct_row, 0, - split_size); - out_block->get_by_position(i).column = std::move(output_dst); - auto cache_dst = - IColumn::mutate(std::move(_cache_block.get_by_position(i).column)); - key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, split_size, - _distinct_row.size()); - _cache_block.get_by_position(i).column = std::move(cache_dst); - } - } else { - for (int i = 0; i < key_size; ++i) { - auto dst = IColumn::mutate(std::move(out_block->get_by_position(i).column)); - key_columns[i]->append_data_by_selector(dst, _distinct_row); - out_block->get_by_position(i).column = std::move(dst); - } + for (int i = 0; i < key_size; ++i) { + auto dst = IColumn::mutate(std::move(out_block->get_by_position(i).column)); + key_columns[i]->append_data_by_selector(dst, _distinct_row); + out_block->get_by_position(i).column = std::move(dst); } } } else { @@ -252,7 +232,6 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( } } out_block->swap(Block(columns_with_schema)); - _cache_block = out_block->clone_empty(); if (_stop_emplace_flag) { in_block->clear(); // clear the column ref with stop_emplace_flag = true } @@ -379,10 +358,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, Block* block, bo if (!local_state._aggregated_block->empty()) { block->swap(*local_state._aggregated_block); local_state._aggregated_block->clear_column_data(block->columns()); - // The cache block may have additional data due to exceeding the batch size. - if (!local_state._cache_block.empty()) { - local_state._swap_cache_block(local_state._aggregated_block.get()); - } } local_state._make_nullable_output_key(block); @@ -428,15 +403,6 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) { return Status::OK(); } _aggregated_block->clear(); - // If the limit is reached, there may still be remaining data in the cache block. - // If the limit is not reached, the cache block must be empty. - // If the query is canceled, it might not satisfy the above conditions. - if (!state->is_cancelled()) { - if (!_reach_limit && !_cache_block.empty()) { - LOG_WARNING("If the limit is not reached, the cache block must be empty."); - } - } - _cache_block.clear(); _arena.clear(); return Base::close(state); diff --git a/be/src/exec/operator/distinct_streaming_aggregation_operator.h b/be/src/exec/operator/distinct_streaming_aggregation_operator.h index abf42eb50cc977..305ae71b37c36b 100644 --- a/be/src/exec/operator/distinct_streaming_aggregation_operator.h +++ b/be/src/exec/operator/distinct_streaming_aggregation_operator.h @@ -56,18 +56,11 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalStateswap(_cache_block); - _cache_block = block->clone_empty(); - } - IColumn::Selector _distinct_row; Arena _arena; size_t _input_num_rows = 0; bool _should_expand_hash_table = true; bool _stop_emplace_flag = false; - const int batch_size; std::unique_ptr _agg_data = nullptr; // group by k1,k2 VExprContextSPtrs _probe_expr_ctxs; @@ -75,7 +68,6 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState _aggregated_block = nullptr; - Block _cache_block; RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _expr_timer = nullptr; RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; diff --git a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp index 88434e47fd7e01..d6c4f3d23c8063 100644 --- a/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp +++ b/be/test/exec/operator/distinct_streaming_aggregation_operator_test.cpp @@ -127,7 +127,6 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { auto block = ColumnHelper::create_block({1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}); EXPECT_TRUE(op->push(state.get(), &block, false)); - EXPECT_EQ(local_state->_cache_block.rows(), 0); EXPECT_EQ(local_state->_aggregated_block->rows(), 4); EXPECT_TRUE(op->need_more_input_data(state.get())); } @@ -135,7 +134,6 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { { auto block = ColumnHelper::create_block({5, 6, 7, 8}); EXPECT_TRUE(op->push(state.get(), &block, false)); - EXPECT_EQ(local_state->_cache_block.rows(), 0); EXPECT_EQ(local_state->_aggregated_block->rows(), 8); EXPECT_TRUE(op->need_more_input_data(state.get())); } @@ -143,8 +141,7 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { { auto block = ColumnHelper::create_block({9, 10, 11, 12}); EXPECT_TRUE(op->push(state.get(), &block, false)); - EXPECT_EQ(local_state->_cache_block.rows(), 2); - EXPECT_EQ(local_state->_aggregated_block->rows(), 10); + EXPECT_EQ(local_state->_aggregated_block->rows(), 12); EXPECT_FALSE(op->need_more_input_data(state.get())); } @@ -153,15 +150,14 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { bool eos = false; EXPECT_TRUE(op->pull(state.get(), &block, &eos)); EXPECT_FALSE(eos); - EXPECT_EQ(local_state->_cache_block.rows(), 0); - EXPECT_EQ(local_state->_aggregated_block->rows(), 2); + EXPECT_EQ(block.rows(), 12); + EXPECT_EQ(local_state->_aggregated_block->rows(), 0); } { local_state->_stop_emplace_flag = true; auto block = ColumnHelper::create_block({13, 14, 15}); EXPECT_TRUE(op->push(state.get(), &block, false)); - EXPECT_EQ(local_state->_cache_block.rows(), 0); - EXPECT_EQ(local_state->_aggregated_block->rows(), 5); + EXPECT_EQ(local_state->_aggregated_block->rows(), 3); EXPECT_FALSE(op->need_more_input_data(state.get())); } { @@ -169,8 +165,7 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { bool eos = false; EXPECT_TRUE(op->pull(state.get(), &block, &eos)); EXPECT_FALSE(eos); - EXPECT_EQ(block.rows(), 5); - EXPECT_EQ(local_state->_cache_block.rows(), 0); + EXPECT_EQ(block.rows(), 3); EXPECT_EQ(local_state->_aggregated_block->rows(), 0); } { @@ -178,7 +173,6 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { local_state->_stop_emplace_flag = true; auto block = ColumnHelper::create_block({13, 14, 15}); EXPECT_TRUE(op->push(state.get(), &block, false)); - EXPECT_EQ(local_state->_cache_block.rows(), 0); EXPECT_EQ(local_state->_aggregated_block->rows(), 3); EXPECT_FALSE(op->need_more_input_data(state.get())); } @@ -188,7 +182,6 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) { EXPECT_TRUE(op->pull(state.get(), &block, &eos)); EXPECT_FALSE(eos); EXPECT_EQ(block.rows(), 3); - EXPECT_EQ(local_state->_cache_block.rows(), 0); EXPECT_EQ(local_state->_aggregated_block->rows(), 0); } { EXPECT_TRUE(op->close(state.get())); }