Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

Loading...
Searching...
No Matches
connection.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
9
10#include <boost/redis/detail/connection_base.hpp>
11#include <boost/redis/logger.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/asio/io_context.hpp>
14#include <boost/asio/coroutine.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/asio/any_io_executor.hpp>
17#include <boost/asio/any_completion_handler.hpp>
18
19#include <chrono>
20#include <memory>
21#include <limits>
22
23namespace boost::redis {
24namespace detail
25{
26template <class Connection, class Logger>
27struct reconnection_op {
28 Connection* conn_ = nullptr;
29 Logger logger_;
30 asio::coroutine coro_{};
31
32 template <class Self>
33 void operator()(Self& self, system::error_code ec = {})
34 {
35 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
36 {
37 BOOST_ASIO_CORO_YIELD
38 conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
39 conn_->cancel(operation::receive);
40 logger_.on_connection_lost(ec);
41 if (!conn_->will_reconnect() || is_cancelled(self)) {
42 conn_->cancel(operation::reconnection);
43 self.complete(!!ec ? ec : asio::error::operation_aborted);
44 return;
45 }
46
47 conn_->timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
48 BOOST_ASIO_CORO_YIELD
49 conn_->timer_.async_wait(std::move(self));
50 BOOST_REDIS_CHECK_OP0(;)
51 if (!conn_->will_reconnect()) {
52 self.complete(asio::error::operation_aborted);
53 return;
54 }
55 conn_->reset_stream();
56 }
57 }
58};
59} // detail
60
71template <class Executor>
73public:
75 using executor_type = Executor;
76
79 { return impl_.get_executor(); }
80
82 template <class Executor1>
84 {
87 };
88
96 explicit
99 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
100 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
101 : impl_{ex, std::move(ctx), max_read_size}
102 , timer_{ex}
103 { }
104
106 explicit
108 asio::io_context& ioc,
109 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
110 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
111 : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
112 { }
113
151 template <
152 class Logger = logger,
153 class CompletionToken = asio::default_completion_token_t<executor_type>>
154 auto
156 config const& cfg = {},
157 Logger l = Logger{},
158 CompletionToken token = CompletionToken{})
159 {
160 using this_type = basic_connection<executor_type>;
161
162 cfg_ = cfg;
163 l.set_prefix(cfg_.log_prefix);
164 return asio::async_compose
165 < CompletionToken
166 , void(system::error_code)
167 >(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
168 }
169
192 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
193 auto async_receive(CompletionToken token = CompletionToken{})
194 { return impl_.async_receive(std::move(token)); }
195
196
208 std::size_t receive(system::error_code& ec)
209 {
210 return impl_.receive(ec);
211 }
212
213 template <
214 class Response = ignore_t,
215 class CompletionToken = asio::default_completion_token_t<executor_type>
216 >
217 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
218 auto
220 Response& response,
221 CompletionToken token = CompletionToken{})
222 {
223 return impl_.async_receive(response, token);
224 }
225
249 template <
250 class Response = ignore_t,
251 class CompletionToken = asio::default_completion_token_t<executor_type>
252 >
253 auto
255 request const& req,
256 Response& resp = ignore,
257 CompletionToken&& token = CompletionToken{})
258 {
259 return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
260 }
261
274 {
275 switch (op) {
277 case operation::all:
278 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
279 timer_.cancel();
280 break;
281 default: /* ignore */;
282 }
283
284 impl_.cancel(op);
285 }
286
288 bool will_reconnect() const noexcept
289 { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
290
292 auto const& get_ssl_context() const noexcept
293 { return impl_.get_ssl_context();}
294
297 { impl_.reset_stream(); }
298
300 auto& next_layer() noexcept
301 { return impl_.next_layer(); }
302
304 auto const& next_layer() const noexcept
305 { return impl_.next_layer(); }
306
308 template <class Response>
310 { impl_.set_receive_response(response); }
311
313 usage get_usage() const noexcept
314 { return impl_.get_usage(); }
315
316private:
317 using timer_type =
318 asio::basic_waitable_timer<
319 std::chrono::steady_clock,
320 asio::wait_traits<std::chrono::steady_clock>,
321 Executor>;
322
323 template <class, class> friend struct detail::reconnection_op;
324
325 config cfg_;
327 timer_type timer_;
328};
329
340public:
342 using executor_type = asio::any_io_executor;
343
345 explicit
347 executor_type ex,
348 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
349 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
350
352 explicit
354 asio::io_context& ioc,
355 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
356 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
357
360 { return impl_.get_executor(); }
361
363 template <class CompletionToken>
364 auto async_run(config const& cfg, logger l, CompletionToken token)
365 {
366 return asio::async_initiate<
367 CompletionToken, void(boost::system::error_code)>(
368 [](auto handler, connection* self, config const* cfg, logger l)
369 {
370 self->async_run_impl(*cfg, l, std::move(handler));
371 }, token, this, &cfg, l);
372 }
373
375 template <class Response, class CompletionToken>
376 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
377 auto async_receive(Response& response, CompletionToken token)
378 {
379 return impl_.async_receive(response, std::move(token));
380 }
381
383 template <class CompletionToken>
384 auto async_receive(CompletionToken token)
385 { return impl_.async_receive(std::move(token)); }
386
388 std::size_t receive(system::error_code& ec)
389 {
390 return impl_.receive(ec);
391 }
392
394 template <class Response, class CompletionToken>
395 auto async_exec(request const& req, Response& resp, CompletionToken token)
396 {
397 return impl_.async_exec(req, resp, std::move(token));
398 }
399
402
404 bool will_reconnect() const noexcept
405 { return impl_.will_reconnect();}
406
408 auto& next_layer() noexcept
409 { return impl_.next_layer(); }
410
412 auto const& next_layer() const noexcept
413 { return impl_.next_layer(); }
414
417 { impl_.reset_stream();}
418
420 template <class Response>
423
425 usage get_usage() const noexcept
426 { return impl_.get_usage(); }
427
429 auto const& get_ssl_context() const noexcept
430 { return impl_.get_ssl_context();}
431
432private:
433 void
434 async_run_impl(
435 config const& cfg,
436 logger l,
437 asio::any_completion_handler<void(boost::system::error_code)> token);
438
440};
441
442} // boost::redis
443
444#endif // BOOST_REDIS_CONNECTION_HPP
A SSL connection to the Redis server.
Definition: connection.hpp:72
void reset_stream()
Resets the underlying stream.
Definition: connection.hpp:296
bool will_reconnect() const noexcept
Returns true if the connection was canceled.
Definition: connection.hpp:288
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:78
auto async_exec(request const &req, Response &resp=ignore, CompletionToken &&token=CompletionToken{})
Executes commands on the Redis server asynchronously.
Definition: connection.hpp:254
usage get_usage() const noexcept
Returns connection usage information.
Definition: connection.hpp:313
auto const & get_ssl_context() const noexcept
Returns the ssl context.
Definition: connection.hpp:292
basic_connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
Definition: connection.hpp:107
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
Definition: connection.hpp:304
basic_connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Constructor.
Definition: connection.hpp:97
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
Definition: connection.hpp:309
void cancel(operation op=operation::all)
Cancel operations.
Definition: connection.hpp:273
auto & next_layer() noexcept
Returns a reference to the next layer.
Definition: connection.hpp:300
std::size_t receive(system::error_code &ec)
Receives server pushes synchronously without blocking.
Definition: connection.hpp:208
auto async_receive(CompletionToken token=CompletionToken{})
Receives server side pushes asynchronously.
Definition: connection.hpp:193
auto async_run(config const &cfg={}, Logger l=Logger{}, CompletionToken token=CompletionToken{})
Starts underlying connection operations.
Definition: connection.hpp:155
Executor executor_type
Executor type.
Definition: connection.hpp:75
Rebinds the socket type to another executor.
Definition: connection.hpp:84
A basic_connection that type erases the executor.
Definition: connection.hpp:339
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
Definition: connection.hpp:404
auto async_exec(request const &req, Response &resp, CompletionToken token)
Calls boost::redis::basic_connection::async_exec.
Definition: connection.hpp:395
auto async_receive(Response &response, CompletionToken token)
Calls boost::redis::basic_connection::async_receive.
Definition: connection.hpp:377
std::size_t receive(system::error_code &ec)
Calls boost::redis::basic_connection::receive.
Definition: connection.hpp:388
auto async_run(config const &cfg, logger l, CompletionToken token)
Calls boost::redis::basic_connection::async_run.
Definition: connection.hpp:364
connection(executor_type ex, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
asio::any_io_executor executor_type
Executor type.
Definition: connection.hpp:342
connection(asio::io_context &ioc, asio::ssl::context ctx=asio::ssl::context{asio::ssl::context::tlsv12_client}, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
auto & next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:408
auto const & get_ssl_context() const noexcept
Returns the ssl context.
Definition: connection.hpp:429
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
Definition: connection.hpp:421
auto const & next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:412
void cancel(operation op=operation::all)
Calls boost::redis::basic_connection::cancel.
usage get_usage() const noexcept
Returns connection usage information.
Definition: connection.hpp:425
auto async_receive(CompletionToken token)
Calls boost::redis::basic_connection::async_receive.
Definition: connection.hpp:384
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:359
void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
Definition: connection.hpp:416
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
Logger class.
Definition: logger.hpp:27
Creates Redis requests.
Definition: request.hpp:46
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
Definition: config.hpp:80
std::string log_prefix
Logger prefix, see boost::redis::logger.
Definition: config.hpp:59
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
Definition: ignore.hpp:31
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:25
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30
Connection usage information.
Definition: usage.hpp:21