boost/beast/websocket/impl/read.ipp
//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
#ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
#define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
#include <boost/beast/websocket/teardown.hpp>
#include <boost/beast/core/bind_handler.hpp>
#include <boost/beast/core/buffers_prefix.hpp>
#include <boost/beast/core/buffers_suffix.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/core/type_traits.hpp>
#include <boost/beast/core/detail/clamp.hpp>
#include <boost/beast/core/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/handler_continuation_hook.hpp>
#include <boost/asio/post.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/optional.hpp>
#include <boost/throw_exception.hpp>
#include <algorithm>
#include <limits>
#include <memory>
namespace boost {
namespace beast {
namespace websocket {
/* Read some message frame data.
Also reads and handles control frames.
*/
template<class NextLayer>
template<
class MutableBufferSequence,
class Handler>
class stream<NextLayer>::read_some_op
: public boost::asio::coroutine
{
Handler h_;
stream<NextLayer>& ws_;
MutableBufferSequence bs_;
buffers_suffix<MutableBufferSequence> cb_;
std::size_t bytes_written_ = 0;
error_code ev_;
token tok_;
close_code code_;
bool did_read_ = false;
bool cont_ = false;
public:
read_some_op(read_some_op&&) = default;
read_some_op(read_some_op const&) = default;
template<class DeducedHandler>
read_some_op(
DeducedHandler&& h,
stream<NextLayer>& ws,
MutableBufferSequence const& bs)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, bs_(bs)
, cb_(bs)
, tok_(ws_.tok_.unique())
, code_(close_code::none)
{
}
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return boost::asio::get_associated_allocator(h_);
}
using executor_type = boost::asio::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return boost::asio::get_associated_executor(
h_, ws_.get_executor());
}
Handler&
handler()
{
return h_;
}
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0,
bool cont = true);
friend
bool asio_handler_is_continuation(read_some_op* op)
{
using boost::asio::asio_handler_is_continuation;
return op->cont_ || asio_handler_is_continuation(
std::addressof(op->h_));
}
};
template<class NextLayer>
template<class MutableBufferSequence, class Handler>
void
stream<NextLayer>::
read_some_op<MutableBufferSequence, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred,
bool cont)
{
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_size;
close_code code{};
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this)
{
// Maybe suspend
do_maybe_suspend:
if(! ws_.rd_block_)
{
// Acquire the read block
ws_.rd_block_ = tok_;
// Make sure the stream is not closed
if( ws_.status_ == status::closed ||
ws_.status_ == status::failed)
{
ec = boost::asio::error::operation_aborted;
goto upcall;
}
}
else
{
do_suspend:
// Suspend
BOOST_ASSERT(ws_.rd_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.paused_r_rd_.save(std::move(*this));
// Acquire the read block
BOOST_ASSERT(! ws_.rd_block_);
ws_.rd_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_);
// The only way to get read blocked is if
// a `close_op` wrote a close frame
BOOST_ASSERT(ws_.wr_close_);
BOOST_ASSERT(ws_.status_ != status::open);
ec = boost::asio::error::operation_aborted;
goto upcall;
}
// if status_ == status::closing, we want to suspend
// the read operation until the close completes,
// then finish the read with operation_aborted.
loop:
BOOST_ASSERT(ws_.rd_block_ == tok_);
// See if we need to read a frame header. This
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
//
if(ws_.rd_remain_ == 0 &&
(! ws_.rd_fh_.fin || ws_.rd_done_))
{
// Read frame header
while(! ws_.parse_fh(
ws_.rd_fh_, ws_.rd_buf_, code))
{
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
code_ = code;
ev_ = error::failed;
goto close;
}
BOOST_ASSERT(ws_.rd_block_ == tok_);
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
BOOST_ASSERT(ws_.rd_block_ == tok_);
if(! ws_.check_ok(ec))
goto upcall;
ws_.rd_buf_.commit(bytes_transferred);
// Allow a close operation
// to acquire the read block
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
if( ws_.paused_r_close_.maybe_invoke())
{
// Suspend
BOOST_ASSERT(ws_.rd_block_);
goto do_suspend;
}
// Acquire read block
ws_.rd_block_ = tok_;
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
detail::mask_inplace(buffers_prefix(
clamp(ws_.rd_fh_.len),
ws_.rd_buf_.data()),
ws_.rd_key_);
if(detail::is_control(ws_.rd_fh_.op))
{
// Clear this otherwise the next
// frame will be considered final.
ws_.rd_fh_.fin = false;
// Handle ping frame
if(ws_.rd_fh_.op == detail::opcode::ping)
{
{
auto const b = buffers_prefix(
clamp(ws_.rd_fh_.len),
ws_.rd_buf_.data());
auto const len = buffer_size(b);
BOOST_ASSERT(len == ws_.rd_fh_.len);
ping_data payload;
detail::read_ping(payload, b);
ws_.rd_buf_.consume(len);
// Ignore ping when closing
if(ws_.status_ == status::closing)
goto loop;
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::ping, payload);
ws_.rd_fb_.reset();
ws_.template write_ping<
flat_static_buffer_base>(ws_.rd_fb_,
detail::opcode::pong, payload);
}
//BOOST_ASSERT(! ws_.paused_r_close_);
// Allow a close operation
// to acquire the read block
BOOST_ASSERT(ws_.rd_block_ == tok_);
ws_.rd_block_.reset();
ws_.paused_r_close_.maybe_invoke();
// Maybe suspend
if(! ws_.wr_block_)
{
// Acquire the write block
ws_.wr_block_ = tok_;
}
else
{
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.paused_rd_.save(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! ws_.wr_block_);
ws_.wr_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(! ws_.check_open(ec))
goto upcall;
}
// Send pong
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(ws_.stream_,
ws_.rd_fb_.data(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(! ws_.check_ok(ec))
goto upcall;
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
goto do_maybe_suspend;
}
// Handle pong frame
if(ws_.rd_fh_.op == detail::opcode::pong)
{
auto const cb = buffers_prefix(clamp(
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_fh_.len);
code = close_code::none;
ping_data payload;
detail::read_ping(payload, cb);
ws_.rd_buf_.consume(len);
// Ignore pong when closing
if(! ws_.wr_close_ && ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::pong, payload);
goto loop;
}
// Handle close frame
BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
{
auto const cb = buffers_prefix(clamp(
ws_.rd_fh_.len), ws_.rd_buf_.data());
auto const len = buffer_size(cb);
BOOST_ASSERT(len == ws_.rd_fh_.len);
BOOST_ASSERT(! ws_.rd_close_);
ws_.rd_close_ = true;
close_reason cr;
detail::read_close(cr, cb, code);
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
code_ = code;
ev_ = error::failed;
goto close;
}
ws_.cr_ = cr;
ws_.rd_buf_.consume(len);
if(ws_.ctrl_cb_)
ws_.ctrl_cb_(frame_type::close,
ws_.cr_.reason);
// See if we are already closing
if(ws_.status_ == status::closing)
{
// _Close the WebSocket Connection_
BOOST_ASSERT(ws_.wr_close_);
code_ = close_code::none;
ev_ = error::closed;
goto close;
}
// _Start the WebSocket Closing Handshake_
code_ = cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code);
ev_ = error::closed;
goto close;
}
}
if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
{
// Empty non-final frame
goto loop;
}
ws_.rd_done_ = false;
}
if(! ws_.pmd_ || ! ws_.pmd_->rd_set)
{
if(ws_.rd_remain_ > 0)
{
if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
(std::min)(clamp(ws_.rd_remain_),
buffer_size(cb_)))
{
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
if(! ws_.check_ok(ec))
goto upcall;
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
detail::mask_inplace(buffers_prefix(clamp(
ws_.rd_remain_), ws_.rd_buf_.data()),
ws_.rd_key_);
}
if(ws_.rd_buf_.size() > 0)
{
// Copy from the read buffer.
// The mask was already applied.
bytes_transferred = buffer_copy(cb_,
ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
auto const mb = buffers_prefix(
bytes_transferred, cb_);
ws_.rd_remain_ -= bytes_transferred;
if(ws_.rd_op_ == detail::opcode::text)
{
if(! ws_.rd_utf8_.write(mb) ||
(ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
ev_ = error::failed;
goto close;
}
}
bytes_written_ += bytes_transferred;
ws_.rd_size_ += bytes_transferred;
ws_.rd_buf_.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(ws_.rd_remain_ > 0);
BOOST_ASSERT(buffer_size(cb_) > 0);
BOOST_ASSERT(buffer_size(buffers_prefix(
clamp(ws_.rd_remain_), cb_)) > 0);
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(buffers_prefix(
clamp(ws_.rd_remain_), cb_), std::move(*this));
if(! ws_.check_ok(ec))
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffers_prefix(
bytes_transferred, cb_);
ws_.rd_remain_ -= bytes_transferred;
if(ws_.rd_fh_.mask)
detail::mask_inplace(mb, ws_.rd_key_);
if(ws_.rd_op_ == detail::opcode::text)
{
if(! ws_.rd_utf8_.write(mb) ||
(ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
ev_ = error::failed;
goto close;
}
}
bytes_written_ += bytes_transferred;
ws_.rd_size_ += bytes_transferred;
}
}
ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
}
else
{
// Read compressed message frame payload:
// inflate even if rd_fh_.len == 0, otherwise we
// never emit the end-of-stream deflate block.
while(buffer_size(cb_) > 0)
{
if( ws_.rd_remain_ > 0 &&
ws_.rd_buf_.size() == 0 &&
! did_read_)
{
// read new
BOOST_ASIO_CORO_YIELD
ws_.stream_.async_read_some(
ws_.rd_buf_.prepare(read_size(
ws_.rd_buf_, ws_.rd_buf_.max_size())),
std::move(*this));
if(! ws_.check_ok(ec))
goto upcall;
BOOST_ASSERT(bytes_transferred > 0);
ws_.rd_buf_.commit(bytes_transferred);
if(ws_.rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(ws_.rd_remain_),
ws_.rd_buf_.data()), ws_.rd_key_);
did_read_ = true;
}
zlib::z_params zs;
{
auto const out = buffers_front(cb_);
zs.next_out = out.data();
zs.avail_out = out.size();
BOOST_ASSERT(zs.avail_out > 0);
}
if(ws_.rd_remain_ > 0)
{
if(ws_.rd_buf_.size() > 0)
{
// use what's there
auto const in = buffers_prefix(
clamp(ws_.rd_remain_), buffers_front(
ws_.rd_buf_.data()));
zs.avail_in = in.size();
zs.next_in = in.data();
}
else
{
break;
}
}
else if(ws_.rd_fh_.fin)
{
// append the empty block codes
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
if(! ec)
{
// https://github.com/madler/zlib/issues/280
if(zs.total_out > 0)
ec = error::partial_deflate_block;
}
if(! ws_.check_ok(ec))
goto upcall;
if(
(ws_.role_ == role_type::client &&
ws_.pmd_config_.server_no_context_takeover) ||
(ws_.role_ == role_type::server &&
ws_.pmd_config_.client_no_context_takeover))
ws_.pmd_->zi.reset();
ws_.rd_done_ = true;
break;
}
else
{
break;
}
ws_.pmd_->zi.write(zs, zlib::Flush::sync, ec);
if(! ws_.check_ok(ec))
goto upcall;
if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
{
// _Fail the WebSocket Connection_
code_ = close_code::too_big;
ev_ = error::failed;
goto close;
}
cb_.consume(zs.total_out);
ws_.rd_size_ += zs.total_out;
ws_.rd_remain_ -= zs.total_in;
ws_.rd_buf_.consume(zs.total_in);
bytes_written_ += zs.total_out;
}
if(ws_.rd_op_ == detail::opcode::text)
{
// check utf8
if(! ws_.rd_utf8_.write(
buffers_prefix(bytes_written_, bs_)) || (
ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
code_ = close_code::bad_payload;
ev_ = error::failed;
goto close;
}
}
}
goto upcall;
close:
if(! ws_.wr_block_)
{
// Acquire the write block
ws_.wr_block_ = tok_;
// Make sure the stream is open
BOOST_ASSERT(ws_.status_ == status::open);
}
else
{
// Suspend
BOOST_ASSERT(ws_.wr_block_ != tok_);
BOOST_ASIO_CORO_YIELD
ws_.paused_rd_.save(std::move(*this));
// Acquire the write block
BOOST_ASSERT(! ws_.wr_block_);
ws_.wr_block_ = tok_;
// Resume
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(), std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
// Make sure the stream is open
if(! ws_.check_open(ec))
goto upcall;
}
// Set the status
ws_.status_ = status::closing;
if(! ws_.wr_close_)
{
ws_.wr_close_ = true;
// Serialize close frame
ws_.rd_fb_.reset();
ws_.template write_close<
flat_static_buffer_base>(
ws_.rd_fb_, code_);
// Send close frame
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(
ws_.stream_, ws_.rd_fb_.data(),
std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(! ws_.check_ok(ec))
goto upcall;
}
// Teardown
using beast::websocket::async_teardown;
BOOST_ASSERT(ws_.wr_block_ == tok_);
BOOST_ASIO_CORO_YIELD
async_teardown(ws_.role_,
ws_.stream_, std::move(*this));
BOOST_ASSERT(ws_.wr_block_ == tok_);
if(ec == boost::asio::error::eof)
{
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
}
if(! ec)
ec = ev_;
if(ec && ec != error::closed)
ws_.status_ = status::failed;
else
ws_.status_ = status::closed;
ws_.close();
upcall:
if(ws_.rd_block_ == tok_)
ws_.rd_block_.reset();
ws_.paused_r_close_.maybe_invoke();
if(ws_.wr_block_ == tok_)
{
ws_.wr_block_.reset();
ws_.paused_close_.maybe_invoke() ||
ws_.paused_ping_.maybe_invoke() ||
ws_.paused_wr_.maybe_invoke();
}
if(! cont_)
return boost::asio::post(
ws_.stream_.get_executor(),
bind_handler(std::move(h_),
ec, bytes_written_));
h_(ec, bytes_written_);
}
}
//------------------------------------------------------------------------------
template<class NextLayer>
template<
class DynamicBuffer,
class Handler>
class stream<NextLayer>::read_op
: public boost::asio::coroutine
{
Handler h_;
stream<NextLayer>& ws_;
DynamicBuffer& b_;
std::size_t limit_;
std::size_t bytes_written_ = 0;
bool some_;
public:
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
read_op(read_op&&) = default;
read_op(read_op const&) = default;
template<class DeducedHandler>
read_op(
DeducedHandler&& h,
stream<NextLayer>& ws,
DynamicBuffer& b,
std::size_t limit,
bool some)
: h_(std::forward<DeducedHandler>(h))
, ws_(ws)
, b_(b)
, limit_(limit ? limit : (
std::numeric_limits<std::size_t>::max)())
, some_(some)
{
}
allocator_type
get_allocator() const noexcept
{
return boost::asio::get_associated_allocator(h_);
}
using executor_type = boost::asio::associated_executor_t<
Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
executor_type
get_executor() const noexcept
{
return boost::asio::get_associated_executor(
h_, ws_.get_executor());
}
void operator()(
error_code ec = {},
std::size_t bytes_transferred = 0);
friend
bool asio_handler_is_continuation(read_op* op)
{
using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation(
std::addressof(op->h_));
}
};
template<class NextLayer>
template<class DynamicBuffer, class Handler>
void
stream<NextLayer>::
read_op<DynamicBuffer, Handler>::
operator()(
error_code ec,
std::size_t bytes_transferred)
{
using beast::detail::clamp;
using buffers_type = typename
DynamicBuffer::mutable_buffers_type;
boost::optional<buffers_type> mb;
BOOST_ASIO_CORO_REENTER(*this)
{
do
{
try
{
mb.emplace(b_.prepare(clamp(
ws_.read_size_hint(b_), limit_)));
}
catch(std::length_error const&)
{
ec = error::buffer_overflow;
}
if(ec)
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(
ws_.get_executor(),
bind_handler(std::move(*this),
error::buffer_overflow, 0));
break;
}
BOOST_ASIO_CORO_YIELD
read_some_op<buffers_type, read_op>{
std::move(*this), ws_, *mb}(
{}, 0, false);
if(ec)
break;
b_.commit(bytes_transferred);
bytes_written_ += bytes_transferred;
}
while(! some_ && ! ws_.is_message_done());
h_(ec, bytes_written_);
}
}
//------------------------------------------------------------------------------
template<class NextLayer>
template<class DynamicBuffer>
std::size_t
stream<NextLayer>::
read(DynamicBuffer& buffer)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
error_code ec;
auto const bytes_written = read(buffer, ec);
if(ec)
BOOST_THROW_EXCEPTION(system_error{ec});
return bytes_written;
}
template<class NextLayer>
template<class DynamicBuffer>
std::size_t
stream<NextLayer>::
read(DynamicBuffer& buffer, error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
std::size_t bytes_written = 0;
do
{
bytes_written += read_some(buffer, 0, ec);
if(ec)
return bytes_written;
}
while(! is_message_done());
return bytes_written;
}
template<class NextLayer>
template<class DynamicBuffer, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
stream<NextLayer>::
async_read(DynamicBuffer& buffer, ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
boost::asio::async_completion<
ReadHandler, void(error_code, std::size_t)> init{handler};
read_op<
DynamicBuffer,
BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
init.completion_handler,
*this,
buffer,
0,
false}();
return init.result.get();
}
//------------------------------------------------------------------------------
template<class NextLayer>
template<class DynamicBuffer>
std::size_t
stream<NextLayer>::
read_some(
DynamicBuffer& buffer,
std::size_t limit)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
error_code ec;
auto const bytes_written =
read_some(buffer, limit, ec);
if(ec)
BOOST_THROW_EXCEPTION(system_error{ec});
return bytes_written;
}
template<class NextLayer>
template<class DynamicBuffer>
std::size_t
stream<NextLayer>::
read_some(
DynamicBuffer& buffer,
std::size_t limit,
error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
using beast::detail::clamp;
if(! limit)
limit = (std::numeric_limits<std::size_t>::max)();
auto const size =
clamp(read_size_hint(buffer), limit);
BOOST_ASSERT(size > 0);
boost::optional<typename
DynamicBuffer::mutable_buffers_type> mb;
try
{
mb.emplace(buffer.prepare(size));
}
catch(std::length_error const&)
{
ec = error::buffer_overflow;
return 0;
}
auto const bytes_written = read_some(*mb, ec);
buffer.commit(bytes_written);
return bytes_written;
}
template<class NextLayer>
template<class DynamicBuffer, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
stream<NextLayer>::
async_read_some(
DynamicBuffer& buffer,
std::size_t limit,
ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
boost::asio::async_completion<ReadHandler,
void(error_code, std::size_t)> init{handler};
read_op<
DynamicBuffer,
BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
init.completion_handler,
*this,
buffer,
limit,
true}({}, 0);
return init.result.get();
}
//------------------------------------------------------------------------------
template<class NextLayer>
template<class MutableBufferSequence>
std::size_t
stream<NextLayer>::
read_some(
MutableBufferSequence const& buffers)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
error_code ec;
auto const bytes_written = read_some(buffers, ec);
if(ec)
BOOST_THROW_EXCEPTION(system_error{ec});
return bytes_written;
}
template<class NextLayer>
template<class MutableBufferSequence>
std::size_t
stream<NextLayer>::
read_some(
MutableBufferSequence const& buffers,
error_code& ec)
{
static_assert(is_sync_stream<next_layer_type>::value,
"SyncStream requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_size;
close_code code{};
std::size_t bytes_written = 0;
ec.assign(0, ec.category());
// Make sure the stream is open
if(! check_open(ec))
return 0;
loop:
// See if we need to read a frame header. This
// condition is structured to give the decompressor
// a chance to emit the final empty deflate block
//
if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
{
// Read frame header
while(! parse_fh(rd_fh_, rd_buf_, code))
{
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
do_fail(code, error::failed, ec);
return bytes_written;
}
auto const bytes_transferred =
stream_.read_some(
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
if(! check_ok(ec))
return bytes_written;
rd_buf_.commit(bytes_transferred);
}
// Immediately apply the mask to the portion
// of the buffer holding payload data.
if(rd_fh_.len > 0 && rd_fh_.mask)
detail::mask_inplace(buffers_prefix(
clamp(rd_fh_.len), rd_buf_.data()),
rd_key_);
if(detail::is_control(rd_fh_.op))
{
// Get control frame payload
auto const b = buffers_prefix(
clamp(rd_fh_.len), rd_buf_.data());
auto const len = buffer_size(b);
BOOST_ASSERT(len == rd_fh_.len);
// Clear this otherwise the next
// frame will be considered final.
rd_fh_.fin = false;
// Handle ping frame
if(rd_fh_.op == detail::opcode::ping)
{
ping_data payload;
detail::read_ping(payload, b);
rd_buf_.consume(len);
if(wr_close_)
{
// Ignore ping when closing
goto loop;
}
if(ctrl_cb_)
ctrl_cb_(frame_type::ping, payload);
detail::frame_buffer fb;
write_ping<flat_static_buffer_base>(fb,
detail::opcode::pong, payload);
boost::asio::write(stream_, fb.data(), ec);
if(! check_ok(ec))
return bytes_written;
goto loop;
}
// Handle pong frame
if(rd_fh_.op == detail::opcode::pong)
{
ping_data payload;
detail::read_ping(payload, b);
rd_buf_.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::pong, payload);
goto loop;
}
// Handle close frame
BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
{
BOOST_ASSERT(! rd_close_);
rd_close_ = true;
close_reason cr;
detail::read_close(cr, b, code);
if(code != close_code::none)
{
// _Fail the WebSocket Connection_
do_fail(code, error::failed, ec);
return bytes_written;
}
cr_ = cr;
rd_buf_.consume(len);
if(ctrl_cb_)
ctrl_cb_(frame_type::close, cr_.reason);
BOOST_ASSERT(! wr_close_);
// _Start the WebSocket Closing Handshake_
do_fail(
cr.code == close_code::none ?
close_code::normal :
static_cast<close_code>(cr.code),
error::closed, ec);
return bytes_written;
}
}
if(rd_fh_.len == 0 && ! rd_fh_.fin)
{
// Empty non-final frame
goto loop;
}
rd_done_ = false;
}
else
{
ec.assign(0, ec.category());
}
if(! pmd_ || ! pmd_->rd_set)
{
if(rd_remain_ > 0)
{
if(rd_buf_.size() == 0 && rd_buf_.max_size() >
(std::min)(clamp(rd_remain_),
buffer_size(buffers)))
{
// Fill the read buffer first, otherwise we
// get fewer bytes at the cost of one I/O.
rd_buf_.commit(stream_.read_some(
rd_buf_.prepare(read_size(rd_buf_,
rd_buf_.max_size())), ec));
if(! check_ok(ec))
return bytes_written;
if(rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(rd_remain_),
rd_buf_.data()), rd_key_);
}
if(rd_buf_.size() > 0)
{
// Copy from the read buffer.
// The mask was already applied.
auto const bytes_transferred =
buffer_copy(buffers, rd_buf_.data(),
clamp(rd_remain_));
auto const mb = buffers_prefix(
bytes_transferred, buffers);
rd_remain_ -= bytes_transferred;
if(rd_op_ == detail::opcode::text)
{
if(! rd_utf8_.write(mb) ||
(rd_remain_ == 0 && rd_fh_.fin &&
! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(
close_code::bad_payload,
error::failed,
ec);
return bytes_written;
}
}
bytes_written += bytes_transferred;
rd_size_ += bytes_transferred;
rd_buf_.consume(bytes_transferred);
}
else
{
// Read into caller's buffer
BOOST_ASSERT(rd_remain_ > 0);
BOOST_ASSERT(buffer_size(buffers) > 0);
BOOST_ASSERT(buffer_size(buffers_prefix(
clamp(rd_remain_), buffers)) > 0);
auto const bytes_transferred =
stream_.read_some(buffers_prefix(
clamp(rd_remain_), buffers), ec);
if(! check_ok(ec))
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
auto const mb = buffers_prefix(
bytes_transferred, buffers);
rd_remain_ -= bytes_transferred;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
if(rd_op_ == detail::opcode::text)
{
if(! rd_utf8_.write(mb) ||
(rd_remain_ == 0 && rd_fh_.fin &&
! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,
error::failed, ec);
return bytes_written;
}
}
bytes_written += bytes_transferred;
rd_size_ += bytes_transferred;
}
}
rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
}
else
{
// Read compressed message frame payload:
// inflate even if rd_fh_.len == 0, otherwise we
// never emit the end-of-stream deflate block.
//
bool did_read = false;
buffers_suffix<MutableBufferSequence> cb{buffers};
while(buffer_size(cb) > 0)
{
zlib::z_params zs;
{
auto const out = buffers_front(cb);
zs.next_out = out.data();
zs.avail_out = out.size();
BOOST_ASSERT(zs.avail_out > 0);
}
if(rd_remain_ > 0)
{
if(rd_buf_.size() > 0)
{
// use what's there
auto const in = buffers_prefix(
clamp(rd_remain_), buffers_front(
rd_buf_.data()));
zs.avail_in = in.size();
zs.next_in = in.data();
}
else if(! did_read)
{
// read new
auto const bytes_transferred =
stream_.read_some(
rd_buf_.prepare(read_size(
rd_buf_, rd_buf_.max_size())),
ec);
if(! check_ok(ec))
return bytes_written;
BOOST_ASSERT(bytes_transferred > 0);
rd_buf_.commit(bytes_transferred);
if(rd_fh_.mask)
detail::mask_inplace(
buffers_prefix(clamp(rd_remain_),
rd_buf_.data()), rd_key_);
auto const in = buffers_prefix(
clamp(rd_remain_), buffers_front(
rd_buf_.data()));
zs.avail_in = in.size();
zs.next_in = in.data();
did_read = true;
}
else
{
break;
}
}
else if(rd_fh_.fin)
{
// append the empty block codes
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
zs.next_in = empty_block;
zs.avail_in = sizeof(empty_block);
pmd_->zi.write(zs, zlib::Flush::sync, ec);
if(! ec)
{
// https://github.com/madler/zlib/issues/280
if(zs.total_out > 0)
ec = error::partial_deflate_block;
}
if(! check_ok(ec))
return bytes_written;
if(
(role_ == role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role_ == role_type::server &&
pmd_config_.client_no_context_takeover))
pmd_->zi.reset();
rd_done_ = true;
break;
}
else
{
break;
}
pmd_->zi.write(zs, zlib::Flush::sync, ec);
if(! check_ok(ec))
return bytes_written;
if(rd_msg_max_ && beast::detail::sum_exceeds(
rd_size_, zs.total_out, rd_msg_max_))
{
do_fail(close_code::too_big,
error::failed, ec);
return bytes_written;
}
cb.consume(zs.total_out);
rd_size_ += zs.total_out;
rd_remain_ -= zs.total_in;
rd_buf_.consume(zs.total_in);
bytes_written += zs.total_out;
}
if(rd_op_ == detail::opcode::text)
{
// check utf8
if(! rd_utf8_.write(
buffers_prefix(bytes_written, buffers)) || (
rd_done_ && ! rd_utf8_.finish()))
{
// _Fail the WebSocket Connection_
do_fail(close_code::bad_payload,
error::failed, ec);
return bytes_written;
}
}
}
return bytes_written;
}
template<class NextLayer>
template<class MutableBufferSequence, class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(error_code, std::size_t))
stream<NextLayer>::
async_read_some(
MutableBufferSequence const& buffers,
ReadHandler&& handler)
{
static_assert(is_async_stream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
static_assert(boost::asio::is_mutable_buffer_sequence<
MutableBufferSequence>::value,
"MutableBufferSequence requirements not met");
boost::asio::async_completion<ReadHandler,
void(error_code, std::size_t)> init{handler};
read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(error_code, std::size_t))>{
init.completion_handler,*this, buffers}(
{}, 0, false);
return init.result.get();
}
} // websocket
} // beast
} // boost
#endif