Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/boost/http_proto/serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,11 @@ struct serializer::stream
class serializer::const_buf_gen_base
{
public:
// Next non-empty buffer
// Returns the next non-empty buffer,
// or an empty buffer if none remain.
virtual
buffers::const_buffer
operator()() = 0;
next() = 0;

// Size of remaining buffers
virtual
Expand Down Expand Up @@ -487,7 +488,7 @@ class serializer::const_buf_gen
}

const_buffer
operator()() override
next() override
{
while(current_ != buffers::end(cbs_))
{
Expand Down
51 changes: 30 additions & 21 deletions src/detail/impl/filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,44 @@ process_impl(
results
{
results rv;

auto it_o = buffers::begin(out);
auto it_i = buffers::begin(in);

if( it_o == buffers::end(out) ||
it_i == buffers::end(in) )
return rv;
auto ob = [&]() -> buffers::mutable_buffer
{
if( it_o != buffers::end(out) )
return *it_o++;
return {};
}();

buffers::const_buffer ib;

auto ob = *it_o++;
auto ib = *it_i++;
for(;;)
{
while( ib.size() == 0 )
{
if( it_i == buffers::end(in) )
{
if( more )
return rv;

// if more == false we return only
// when output buffers are full.
break;
}
else
{
ib = *it_i++;
}
}

// empty input buffers may be passed, and
// this is intentional and valid.
results rs = process_impl(ob, ib, more);
results rs = process_impl(
ob,
ib,
more || it_i != buffers::end(in));

rv.out_bytes += rs.out_bytes;
rv.in_bytes += rs.in_bytes;
Expand All @@ -62,21 +86,6 @@ process_impl(
return rv;
ob = *it_o++;
}

if( ib.size() == 0 )
{
if( it_i == buffers::end(in) )
{
// if `more == false` we return only
// when `out` buffers are full.
if( more )
return rv;
}
else
{
ib = *it_i++;
}
}
}
}

Expand Down
170 changes: 99 additions & 71 deletions src/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,11 @@ void
serializer::
reset() noexcept
{
ws_.clear();
filter_ = nullptr;

cb0_ = {};
tmp_ = {};

more_input_ = false;
is_done_ = false;
is_header_done_ = false;
is_chunked_ = false;
needs_exp100_continue_ = false;
filter_done_ = false;

ws_.clear();
}

//------------------------------------------------
Expand Down Expand Up @@ -343,23 +335,23 @@ prepare() ->
prepped_.slide_to_front();
while(prepped_.capacity() != 0)
{
auto buf = buf_gen_->operator()();
auto buf = buf_gen_->next();
if(buf.size() != 0)
{
prepped_.append(buf);
}
else // buf_gen_ is empty
{
// crlf and final chunk
if(tmp_.size() != 0)
// append crlf and final chunk
if(is_chunked_)
{
prepped_.append(tmp_);
tmp_ = {};
more_input_ = false;
}
break;
}
}
if(buf_gen_->is_empty() && tmp_.size() == 0)
if(buf_gen_->is_empty() && !is_chunked_)
more_input_ = false;
}
return const_buffers_type(
Expand All @@ -377,16 +369,18 @@ prepare() ->
if(apndr.is_full())
break;

auto rs = source_->read(
const auto rs = source_->read(
apndr.prepare());

if(rs.ec.failed())
{
is_done_ = true;
BOOST_HTTP_PROTO_RETURN_EC(rs.ec);
return rs.ec;
}

more_input_ = !rs.finished;
if(rs.finished)
more_input_ = false;

apndr.commit(rs.bytes, more_input_);
break;
}
Expand All @@ -400,90 +394,121 @@ prepare() ->
}
else // filter
{
if(st_ == style::empty)
switch(st_)
{
case style::empty:
return const_buffers_type(
prepped_.begin(),
prepped_.size());

auto get_input = [&]()
case style::buffers:
{
if(st_ == style::buffers)
appender apndr(cb0_, is_chunked_);
while(!apndr.is_full() && !filter_done_)
{
// TODO: for efficiency of deflator, we might
// need to return multiple buffers at once
if(tmp_.size() == 0)
if(more_input_ && tmp_.size() == 0)
{
tmp_ = buf_gen_->operator()();
more_input_ = !buf_gen_->is_empty();
tmp_ = buf_gen_->next();
if(tmp_.size() == 0) // buf_gen_ is empty
more_input_ = false;
}
return buffers::
const_buffer_pair{ tmp_, {} };
}

BOOST_ASSERT(
st_ == style::source ||
st_ == style::stream);
const auto rs = filter_->process(
apndr.prepare(),
tmp_,
more_input_);

if(rs.ec.failed())
{
is_done_ = true;
return rs.ec;
}

tmp_ = buffers::sans_prefix(tmp_, rs.in_bytes);
apndr.commit(rs.out_bytes, !rs.finished);

if(st_ == style::source &&
more_input_ &&
cb1_.capacity() != 0)
{
// TODO: handle source error
auto rs = source_->read(
cb1_.prepare(cb1_.capacity()));
if(rs.finished)
more_input_ = false;
cb1_.commit(rs.bytes);
filter_done_ = true;
}
break;
}

return cb1_.data();
};

auto consume = [&](std::size_t n)
case style::source:
{
if(st_ == style::buffers)
appender apndr(cb0_, is_chunked_);
while(!apndr.is_full() && !filter_done_)
{
tmp_ = buffers::sans_prefix(
tmp_, n);
return;
if(more_input_ && cb1_.capacity() != 0)
{
const auto rs = source_->read(
cb1_.prepare(cb1_.capacity()));
if(rs.ec.failed())
{
is_done_ = true;
return rs.ec;
}
if(rs.finished)
more_input_ = false;
cb1_.commit(rs.bytes);
}

const auto rs = filter_->process(
apndr.prepare(),
cb1_.data(),
more_input_);

if(rs.ec.failed())
{
is_done_ = true;
return rs.ec;
}

cb1_.consume(rs.in_bytes);
apndr.commit(rs.out_bytes, !rs.finished);

if(rs.finished)
filter_done_ = true;
}
BOOST_ASSERT(
st_ == style::source ||
st_ == style::stream);
cb1_.consume(n);
};

// handles chunked payloads automatically
appender apndr(cb0_, is_chunked_);
for(;;)
break;
}

case style::stream:
{
if(apndr.is_full())
appender apndr(cb0_, is_chunked_);

if(apndr.is_full() || filter_done_)
break;

auto cbs = get_input();
// The stream object is expected to
// have already populated cb1_
if(more_input_ && cb1_.size() == 0)
{
if(!prepped_.empty())
break;

if(more_input_ && buffers::size(cbs) == 0)
break;
BOOST_HTTP_PROTO_RETURN_EC(
error::need_data);
}

auto rs = filter_->process(
const auto rs = filter_->process(
apndr.prepare(),
cbs,
cb1_.data(),
more_input_);

if(rs.ec.failed())
{
is_done_ = true;
BOOST_HTTP_PROTO_RETURN_EC(rs.ec);
return rs.ec;
}

consume(rs.in_bytes);
cb1_.consume(rs.in_bytes);
apndr.commit(rs.out_bytes, !rs.finished);

if(rs.finished)
{
filter_done_ = true;
break;
}

break;
}
}
}

Expand Down Expand Up @@ -623,6 +648,7 @@ start_empty(
}

prepped_[0] = { m.ph_->cbuf, m.ph_->size };
more_input_ = false;
}

void
Expand Down Expand Up @@ -654,7 +680,7 @@ start_buffers(
std::generate(
prepped_.begin() + 1,
prepped_.end(),
std::ref(*buf_gen_));
[this](){ return buf_gen_->next(); });
more_input_ = !buf_gen_->is_empty();
return;
}
Expand All @@ -676,6 +702,7 @@ start_buffers(

prepped_[0] = { m.ph_->cbuf, m.ph_->size };
prepped_[1] = final_chunk;
more_input_ = false;
return;
}

Expand Down Expand Up @@ -717,14 +744,14 @@ start_buffers(
std::generate(
prepped_.begin() + 2,
prepped_.end() - 1,
std::ref(*buf_gen_));
[this](){ return buf_gen_->next(); });

more_input_ = !buf_gen_->is_empty();
// assigning the last slot
if(more_input_)
{
prepped_[prepped_.size() - 1] =
buf_gen_->operator()();
buf_gen_->next();

// deferred until buf_gen_ is drained
tmp_ = crlf_and_final_chunk;
Expand Down Expand Up @@ -758,6 +785,7 @@ start_buffers(
}

prepped_[0] = { m.ph_->cbuf, m.ph_->size };
tmp_ = {};
more_input_ = !buf_gen_->is_empty();
}

Expand Down
Loading
Loading