Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 5 additions & 39 deletions be/src/exec/operator/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace doris {
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
batch_size(state->batch_size()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_child_block(Block::create_unique()),
_aggregated_block(Block::create_unique()),
Expand Down Expand Up @@ -195,8 +194,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
SCOPED_TIMER(_insert_keys_to_column_timer);
if (mem_reuse) {
if (_stop_emplace_flag && !out_block->empty()) {
// when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block
// need output those data firstly
// Output existing aggregated rows before switching to passthrough mode.
DCHECK(_distinct_row.empty());
_distinct_row.resize(rows);
std::iota(_distinct_row.begin(), _distinct_row.end(), 0);
Expand All @@ -211,28 +209,10 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
in_block->replace_by_position(result_idxs[i], output_column);
}
} else {
DCHECK_EQ(_cache_block.rows(), 0);
// is output row > batch_size, split some to cache_block
if (out_block->rows() + _distinct_row.size() > batch_size) {
size_t split_size = batch_size - out_block->rows();
for (int i = 0; i < key_size; ++i) {
auto output_dst =
IColumn::mutate(std::move(out_block->get_by_position(i).column));
key_columns[i]->append_data_by_selector(output_dst, _distinct_row, 0,
split_size);
out_block->get_by_position(i).column = std::move(output_dst);
auto cache_dst =
IColumn::mutate(std::move(_cache_block.get_by_position(i).column));
key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, split_size,
_distinct_row.size());
_cache_block.get_by_position(i).column = std::move(cache_dst);
}
} else {
for (int i = 0; i < key_size; ++i) {
auto dst = IColumn::mutate(std::move(out_block->get_by_position(i).column));
key_columns[i]->append_data_by_selector(dst, _distinct_row);
out_block->get_by_position(i).column = std::move(dst);
}
for (int i = 0; i < key_size; ++i) {
auto dst = IColumn::mutate(std::move(out_block->get_by_position(i).column));
key_columns[i]->append_data_by_selector(dst, _distinct_row);
out_block->get_by_position(i).column = std::move(dst);
}
}
} else {
Expand All @@ -252,7 +232,6 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
}
}
out_block->swap(Block(columns_with_schema));
_cache_block = out_block->clone_empty();
if (_stop_emplace_flag) {
in_block->clear(); // clear the column ref with stop_emplace_flag = true
}
Expand Down Expand Up @@ -379,10 +358,6 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, Block* block, bo
if (!local_state._aggregated_block->empty()) {
block->swap(*local_state._aggregated_block);
local_state._aggregated_block->clear_column_data(block->columns());
// The cache block may have additional data due to exceeding the batch size.
if (!local_state._cache_block.empty()) {
local_state._swap_cache_block(local_state._aggregated_block.get());
}
}

local_state._make_nullable_output_key(block);
Expand Down Expand Up @@ -428,15 +403,6 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
return Status::OK();
}
_aggregated_block->clear();
// If the limit is reached, there may still be remaining data in the cache block.
// If the limit is not reached, the cache block must be empty.
// If the query is canceled, it might not satisfy the above conditions.
if (!state->is_cancelled()) {
if (!_reach_limit && !_cache_block.empty()) {
LOG_WARNING("If the limit is not reached, the cache block must be empty.");
}
}
_cache_block.clear();

_arena.clear();
return Base::close(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,18 @@ class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeShar
void _make_nullable_output_key(Block* block);
bool _should_expand_preagg_hash_tables();

void _swap_cache_block(Block* block) {
DCHECK(!_cache_block.is_empty_column());
block->swap(_cache_block);
_cache_block = block->clone_empty();
}

IColumn::Selector _distinct_row;
Arena _arena;
size_t _input_num_rows = 0;
bool _should_expand_hash_table = true;
bool _stop_emplace_flag = false;
const int batch_size;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
// group by k1,k2
VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<Block> _child_block = nullptr;
bool _child_eos = false;
bool _reach_limit = false;
std::unique_ptr<Block> _aggregated_block = nullptr;
Block _cache_block;
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _expr_timer = nullptr;
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,21 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) {
auto block =
ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4});
EXPECT_TRUE(op->push(state.get(), &block, false));
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 4);
EXPECT_TRUE(op->need_more_input_data(state.get()));
}

{
auto block = ColumnHelper::create_block<DataTypeInt64>({5, 6, 7, 8});
EXPECT_TRUE(op->push(state.get(), &block, false));
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 8);
EXPECT_TRUE(op->need_more_input_data(state.get()));
}

{
auto block = ColumnHelper::create_block<DataTypeInt64>({9, 10, 11, 12});
EXPECT_TRUE(op->push(state.get(), &block, false));
EXPECT_EQ(local_state->_cache_block.rows(), 2);
EXPECT_EQ(local_state->_aggregated_block->rows(), 10);
EXPECT_EQ(local_state->_aggregated_block->rows(), 12);
EXPECT_FALSE(op->need_more_input_data(state.get()));
}

Expand All @@ -153,32 +150,29 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) {
bool eos = false;
EXPECT_TRUE(op->pull(state.get(), &block, &eos));
EXPECT_FALSE(eos);
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 2);
EXPECT_EQ(block.rows(), 12);
EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
}
{
local_state->_stop_emplace_flag = true;
auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
EXPECT_TRUE(op->push(state.get(), &block, false));
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 5);
EXPECT_EQ(local_state->_aggregated_block->rows(), 3);
EXPECT_FALSE(op->need_more_input_data(state.get()));
}
{
Block block;
bool eos = false;
EXPECT_TRUE(op->pull(state.get(), &block, &eos));
EXPECT_FALSE(eos);
EXPECT_EQ(block.rows(), 5);
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(block.rows(), 3);
EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
}
{
EXPECT_TRUE(op->need_more_input_data(state.get()));
local_state->_stop_emplace_flag = true;
auto block = ColumnHelper::create_block<DataTypeInt64>({13, 14, 15});
EXPECT_TRUE(op->push(state.get(), &block, false));
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 3);
EXPECT_FALSE(op->need_more_input_data(state.get()));
}
Expand All @@ -188,7 +182,6 @@ TEST_F(DistinctStreamingAggOperatorTest, test3) {
EXPECT_TRUE(op->pull(state.get(), &block, &eos));
EXPECT_FALSE(eos);
EXPECT_EQ(block.rows(), 3);
EXPECT_EQ(local_state->_cache_block.rows(), 0);
EXPECT_EQ(local_state->_aggregated_block->rows(), 0);
}
{ EXPECT_TRUE(op->close(state.get())); }
Expand Down
Loading