From 5074f93efa30df53d00b550e4198a5a56c93dbde Mon Sep 17 00:00:00 2001 From: Mohammad Nejati Date: Sun, 15 Jun 2025 16:34:50 +0000 Subject: [PATCH] Improved buffer management in serializer --- include/boost/http_proto/serializer.hpp | 7 +- src/detail/impl/filter.hpp | 51 ++++--- src/serializer.cpp | 170 ++++++++++++++---------- src/source.cpp | 16 ++- test/unit/zlib.cpp | 23 ++-- 5 files changed, 154 insertions(+), 113 deletions(-) diff --git a/include/boost/http_proto/serializer.hpp b/include/boost/http_proto/serializer.hpp index ebd309cc..9b4702f0 100644 --- a/include/boost/http_proto/serializer.hpp +++ b/include/boost/http_proto/serializer.hpp @@ -444,10 +444,11 @@ struct serializer::stream class serializer::const_buf_gen_base { public: - // Next non-empty buffer + // Returns the next non-empty buffer, + // or an empty buffer if none remain. virtual buffers::const_buffer - operator()() = 0; + next() = 0; // Size of remaining buffers virtual @@ -487,7 +488,7 @@ class serializer::const_buf_gen } const_buffer - operator()() override + next() override { while(current_ != buffers::end(cbs_)) { diff --git a/src/detail/impl/filter.hpp b/src/detail/impl/filter.hpp index d5417c23..94d5da71 100644 --- a/src/detail/impl/filter.hpp +++ b/src/detail/impl/filter.hpp @@ -30,20 +30,44 @@ process_impl( results { results rv; + auto it_o = buffers::begin(out); auto it_i = buffers::begin(in); - if( it_o == buffers::end(out) || - it_i == buffers::end(in) ) - return rv; + auto ob = [&]() -> buffers::mutable_buffer + { + if( it_o != buffers::end(out) ) + return *it_o++; + return {}; + }(); + + buffers::const_buffer ib; - auto ob = *it_o++; - auto ib = *it_i++; for(;;) { + while( ib.size() == 0 ) + { + if( it_i == buffers::end(in) ) + { + if( more ) + return rv; + + // if more == false we return only + // when output buffers are full. + break; + } + else + { + ib = *it_i++; + } + } + // empty input buffers may be passed, and // this is intentional and valid. - results rs = process_impl(ob, ib, more); + results rs = process_impl( + ob, + ib, + more || it_i != buffers::end(in)); rv.out_bytes += rs.out_bytes; rv.in_bytes += rs.in_bytes; @@ -62,21 +86,6 @@ process_impl( return rv; ob = *it_o++; } - - if( ib.size() == 0 ) - { - if( it_i == buffers::end(in) ) - { - // if `more == false` we return only - // when `out` buffers are full. - if( more ) - return rv; - } - else - { - ib = *it_i++; - } - } } } diff --git a/src/serializer.cpp b/src/serializer.cpp index 830ee4ed..208ed582 100644 --- a/src/serializer.cpp +++ b/src/serializer.cpp @@ -286,19 +286,11 @@ void serializer:: reset() noexcept { + ws_.clear(); filter_ = nullptr; - - cb0_ = {}; - tmp_ = {}; - - more_input_ = false; is_done_ = false; is_header_done_ = false; - is_chunked_ = false; - needs_exp100_continue_ = false; filter_done_ = false; - - ws_.clear(); } //------------------------------------------------ @@ -343,23 +335,23 @@ prepare() -> prepped_.slide_to_front(); while(prepped_.capacity() != 0) { - auto buf = buf_gen_->operator()(); + auto buf = buf_gen_->next(); if(buf.size() != 0) { prepped_.append(buf); } else // buf_gen_ is empty { - // crlf and final chunk - if(tmp_.size() != 0) + // append crlf and final chunk + if(is_chunked_) { prepped_.append(tmp_); - tmp_ = {}; + more_input_ = false; } break; } } - if(buf_gen_->is_empty() && tmp_.size() == 0) + if(buf_gen_->is_empty() && !is_chunked_) more_input_ = false; } return const_buffers_type( @@ -377,16 +369,18 @@ prepare() -> if(apndr.is_full()) break; - auto rs = source_->read( + const auto rs = source_->read( apndr.prepare()); if(rs.ec.failed()) { is_done_ = true; - BOOST_HTTP_PROTO_RETURN_EC(rs.ec); + return rs.ec; } - more_input_ = !rs.finished; + if(rs.finished) + more_input_ = false; + apndr.commit(rs.bytes, more_input_); break; } @@ -400,90 +394,121 @@ prepare() -> } else // filter { - if(st_ == style::empty) + switch(st_) + { + case style::empty: return const_buffers_type( prepped_.begin(), prepped_.size()); - auto get_input = [&]() + case style::buffers: { - if(st_ == style::buffers) + appender apndr(cb0_, is_chunked_); + while(!apndr.is_full() && !filter_done_) { - // TODO: for efficiency of deflator, we might - // need to return multiple buffers at once - if(tmp_.size() == 0) + if(more_input_ && tmp_.size() == 0) { - tmp_ = buf_gen_->operator()(); - more_input_ = !buf_gen_->is_empty(); + tmp_ = buf_gen_->next(); + if(tmp_.size() == 0) // buf_gen_ is empty + more_input_ = false; } - return buffers:: - const_buffer_pair{ tmp_, {} }; - } - BOOST_ASSERT( - st_ == style::source || - st_ == style::stream); + const auto rs = filter_->process( + apndr.prepare(), + tmp_, + more_input_); + + if(rs.ec.failed()) + { + is_done_ = true; + return rs.ec; + } + + tmp_ = buffers::sans_prefix(tmp_, rs.in_bytes); + apndr.commit(rs.out_bytes, !rs.finished); - if(st_ == style::source && - more_input_ && - cb1_.capacity() != 0) - { - // TODO: handle source error - auto rs = source_->read( - cb1_.prepare(cb1_.capacity())); if(rs.finished) - more_input_ = false; - cb1_.commit(rs.bytes); + filter_done_ = true; } + break; + } - return cb1_.data(); - }; - - auto consume = [&](std::size_t n) + case style::source: { - if(st_ == style::buffers) + appender apndr(cb0_, is_chunked_); + while(!apndr.is_full() && !filter_done_) { - tmp_ = buffers::sans_prefix( - tmp_, n); - return; + if(more_input_ && cb1_.capacity() != 0) + { + const auto rs = source_->read( + cb1_.prepare(cb1_.capacity())); + if(rs.ec.failed()) + { + is_done_ = true; + return rs.ec; + } + if(rs.finished) + more_input_ = false; + cb1_.commit(rs.bytes); + } + + const auto rs = filter_->process( + apndr.prepare(), + cb1_.data(), + more_input_); + + if(rs.ec.failed()) + { + is_done_ = true; + return rs.ec; + } + + cb1_.consume(rs.in_bytes); + apndr.commit(rs.out_bytes, !rs.finished); + + if(rs.finished) + filter_done_ = true; } - BOOST_ASSERT( - st_ == style::source || - st_ == style::stream); - cb1_.consume(n); - }; - - // handles chunked payloads automatically - appender apndr(cb0_, is_chunked_); - for(;;) + break; + } + + case style::stream: { - if(apndr.is_full()) + appender apndr(cb0_, is_chunked_); + + if(apndr.is_full() || filter_done_) break; - auto cbs = get_input(); + // The stream object is expected to + // have already populated cb1_ + if(more_input_ && cb1_.size() == 0) + { + if(!prepped_.empty()) + break; - if(more_input_ && buffers::size(cbs) == 0) - break; + BOOST_HTTP_PROTO_RETURN_EC( + error::need_data); + } - auto rs = filter_->process( + const auto rs = filter_->process( apndr.prepare(), - cbs, + cb1_.data(), more_input_); if(rs.ec.failed()) { is_done_ = true; - BOOST_HTTP_PROTO_RETURN_EC(rs.ec); + return rs.ec; } - consume(rs.in_bytes); + cb1_.consume(rs.in_bytes); apndr.commit(rs.out_bytes, !rs.finished); if(rs.finished) - { filter_done_ = true; - break; - } + + break; + } } } @@ -623,6 +648,7 @@ start_empty( } prepped_[0] = { m.ph_->cbuf, m.ph_->size }; + more_input_ = false; } void @@ -654,7 +680,7 @@ start_buffers( std::generate( prepped_.begin() + 1, prepped_.end(), - std::ref(*buf_gen_)); + [this](){ return buf_gen_->next(); }); more_input_ = !buf_gen_->is_empty(); return; } @@ -676,6 +702,7 @@ start_buffers( prepped_[0] = { m.ph_->cbuf, m.ph_->size }; prepped_[1] = final_chunk; + more_input_ = false; return; } @@ -717,14 +744,14 @@ start_buffers( std::generate( prepped_.begin() + 2, prepped_.end() - 1, - std::ref(*buf_gen_)); + [this](){ return buf_gen_->next(); }); more_input_ = !buf_gen_->is_empty(); // assigning the last slot if(more_input_) { prepped_[prepped_.size() - 1] = - buf_gen_->operator()(); + buf_gen_->next(); // deferred until buf_gen_ is drained tmp_ = crlf_and_final_chunk; @@ -758,6 +785,7 @@ start_buffers( } prepped_[0] = { m.ph_->cbuf, m.ph_->size }; + tmp_ = {}; more_input_ = !buf_gen_->is_empty(); } diff --git a/src/source.cpp b/src/source.cpp index 235003ea..ca7abc93 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -1,15 +1,14 @@ // // Copyright (c) 2023 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2025 Mohammad Nejati // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -// Official repository: https://github.com/cppalliance/buffers +// Official repository: https://github.com/cppalliance/http_proto // #include -#include -#include namespace boost { namespace http_proto { @@ -28,11 +27,16 @@ on_read( do { buffers::mutable_buffer b(*it++); - rv += on_read(b); - if(rv.ec.failed()) + auto rs = on_read(b); + rv += rs; + if(rs.ec.failed()) return rv; - if(rv.finished) + if(rs.finished) break; + // Source must fill the entire buffer + // unless it has finished + if(b.size() != rs.bytes) + detail::throw_logic_error(); } while(it != end_); return rv; diff --git a/test/unit/zlib.cpp b/test/unit/zlib.cpp index 4cc1a83b..89f5ffef 100644 --- a/test/unit/zlib.cpp +++ b/test/unit/zlib.cpp @@ -9,7 +9,9 @@ // #include +#include +#include "boost/buffers/prefix.hpp" #include "test_suite.hpp" #ifndef BOOST_HTTP_PROTO_HAS_ZLIB @@ -186,7 +188,7 @@ struct zlib_test } std::vector decompressed_output( - 2 * expected.size(), 0x00); + 2 * expected.size() + 50, 0x00); zs.next_in = compressed.data(); zs.avail_in = @@ -234,9 +236,7 @@ struct zlib_test b, buffers::const_buffer( body_view_.data(), - std::min( - std::size_t{512}, - body_view_.size()))); + body_view_.size())); body_view_ = body_view_.subspan(n); rs.bytes = n; @@ -289,9 +289,8 @@ struct zlib_test std::min( std::size_t{512}, body_view.size()))); - - BOOST_TEST_GT(n, 0); - stream.commit(n); + body_view = body_view.subspan(n); + stream.commit(n); auto cbs = sr.prepare().value(); BOOST_TEST_GT(buffers::size(cbs), 0); @@ -301,8 +300,8 @@ struct zlib_test BOOST_TEST_EQ(n2, buffers::size(cbs)); sr.consume(n2); output_buf = buffers::sans_prefix(output_buf, n2); - body_view = body_view.subspan(n); } + stream.close(); while(! sr.is_done() ) @@ -351,9 +350,6 @@ struct zlib_test buf_seq.push_back( {body_view.data() + offset, remaining}); } - - for( auto buf : buf_seq ) - BOOST_TEST_GT(buf.size(), 0); } buffers::const_buffer_span bufs( @@ -422,7 +418,7 @@ struct zlib_test core::string_view str = header; std::vector output( - str.size() + 3 * body.size(), 0x00); + str.size() + 3 * body.size() + 50, 0x00); span body_view = body; auto output_buf = @@ -492,6 +488,8 @@ struct zlib_test void test_serializer() { + std::string empty_body = ""; + std::string short_body = "hello world, compression seems super duper cool! hmm, but what if I also add like a whole bunch of text to this thing????"; @@ -499,6 +497,7 @@ struct zlib_test generate_book(350000); std::vector bodies = { + empty_body, short_body, long_body };