5#include "../type_traits.hpp"
7#include "../detail/action_queue.hpp"
8#include "../http/request.hpp"
9#include "../websocket/stream.hpp"
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/post.hpp>
13#include <boost/beast/core/error.hpp>
14#include <fmt/format.h>
15#include <spdlog/spdlog.h>
34 template<
bool isClient>
36 public std::enable_shared_from_this<connection<isClient>>
62 m_logger->trace(
"destructor()");
73 std::shared_ptr<spdlog::logger>
105 std::shared_ptr<connection>
106 make(
const std::shared_ptr<spdlog::logger>
logger,
stream&& ws,
const std::string& agent_string)
112 return std::shared_ptr<connection>{me};
128 template<concepts::accept_handler Callback>
130 connect(
const boost::asio::ip::tcp::resolver::results_type& target,
const std::string& resource, Callback&& done)
133 m_logger->trace(
"connect()");
135 if (m_state != state::inactive)
136 throw std::logic_error{
"connect() called on already active websocket connection"};
139 m_ws.
get_lowest_layer([&, me = this->shared_from_this(),
this, done = std::forward<Callback>(done), resource](
auto& sock)
mutable {
140 sock.expires_after(std::chrono::seconds(30));
145 [
this, me, target, done = std::forward<Callback>(done), resource](
auto ec,
auto ep)
mutable {
149 me->on_connect(ec, ep, resource, [
this, done = std::forward<Callback>(done)](
auto ec)
mutable {
151 std::invoke(std::forward<
decltype(done)>(done), ec);
169 template<
class Body,
class Fields, std::invocable<> Callback>
171 accept(
const boost::beast::http::request<Body, Fields>& req, Callback&& done)
174 m_logger->trace(
"accept()");
176 if (m_state != state::inactive)
177 throw std::logic_error{
"accept() called on already active websocket connection"};
180 m_state = state::handshaking;
185 m_ws.async_accept(req, [
this, me = this->shared_from_this(), done = std::forward<
decltype(done)>(done)](
malloy::error_code ec)
mutable {
186 m_logger->trace(
"on_accept()");
190 m_logger->error(
"on_accept(): {}", ec.message());
197 std::invoke(std::forward<
decltype(done)>(done));
211 disconnect(boost::beast::websocket::close_reason why = boost::beast::websocket::normal)
213 m_logger->trace(
"disconnect()");
215 if (m_state == state::closed || m_state == state::closing)
218 auto build_act = [
this, why, me = this->shared_from_this()](
const auto& on_done)
mutable {
220 if (m_state == state::closed || m_state == state::closing) {
225 do_disconnect(why, on_done);
229 m_write_queue.
push(build_act);
230 m_read_queue.
push(build_act);
241 force_disconnect(boost::beast::websocket::close_reason why = boost::beast::websocket::normal)
243 m_logger->trace(
"force_disconnect()");
245 if (m_state == state::inactive)
246 throw std::logic_error{
"force_disconnect() called on inactive websocket connection"};
248 else if (m_state == state::closed || m_state == state::closing)
251 do_disconnect(why, []{});
269 m_logger->trace(
"read()");
274 me = this->shared_from_this(),
276 done = std::forward<
decltype(done)>(done)
278 (
const auto& on_done)
mutable
280 assert(buff !=
nullptr);
281 m_ws.async_read(*buff, [
this, me, on_done, done = std::forward<
decltype(done)>(done)](
auto ec,
auto size)
mutable {
282 std::invoke(std::forward<
decltype(done)>(done), ec, size);
299 template<concepts::async_read_handler Callback>
303 m_logger->trace(
"send(). payload size: {}", payload.size());
305 m_write_queue.
push([buff = payload, done = std::forward<Callback>(done),
this, me = this->shared_from_this()](
const auto& on_done)
mutable {
306 m_ws.async_write(buff, [
this, me, on_done, done = std::forward<
decltype(done)>(done)](
auto ec,
auto size)
mutable {
308 std::invoke(std::forward<Callback>(done), ec, size);
315 enum class sending_state
321 enum sending_state m_sending_state = sending_state::idling;
322 std::shared_ptr<spdlog::logger> m_logger;
324 std::string m_agent_string;
325 act_queue_t m_write_queue;
326 act_queue_t m_read_queue;
327 std::atomic<state> m_state{ state::inactive };
330 std::shared_ptr<spdlog::logger>
logger, stream&& ws, std::string agent_str) :
331 m_logger(std::move(
logger)),
333 m_agent_string{std::move(agent_str)},
334 m_write_queue{boost::asio::make_strand(m_ws.get_executor())},
335 m_read_queue{boost::asio::make_strand(m_ws.get_executor())}
339 throw std::invalid_argument(
"no valid logger provided.");
345 m_logger->trace(
"go_active()");
348 m_state = state::active;
358 m_logger->trace(
"setup_connection()");
362 boost::beast::websocket::stream_base::timeout::suggested(
363 isClient ? boost::beast::role_type::client : boost::beast::role_type::server)
367 const auto agent_field = isClient ? malloy::http::field::user_agent : malloy::http::field::server;
369 boost::beast::websocket::stream_base::decorator(
370 [
this, agent_field](boost::beast::websocket::request_type& req) {
371 req.set(agent_field, m_agent_string);
378 do_disconnect(boost::beast::websocket::close_reason why,
const std::invocable<>
auto& on_done)
380 m_logger->trace(
"do_disconnect()");
383 m_state = state::closing;
385 m_ws.async_close(why, [me = this->shared_from_this(),
this, on_done](
auto ec) {
387 m_logger->error(
"could not close websocket: '{}'", ec.message());
397 boost::beast::error_code ec,
398 boost::asio::ip::tcp::resolver::results_type::endpoint_type ep,
399 const std::string& resource,
400 concepts::accept_handler
auto&& on_handshake)
402 m_logger->trace(
"on_connect()");
405 m_logger->error(
"on_connect(): {}", ec.message());
409 m_ws.get_lowest_layer([](
auto& s) { s.expires_never(); });
414 const std::string host = fmt::format(
"{}:{}", ep.address().to_string(), ep.port());
416#if MALLOY_FEATURE_TLS
417 if constexpr (isClient) {
420 m_ws.async_handshake_tls(
421 boost::asio::ssl::stream_base::handshake_type::client,
422 [on_handshake = std::forward<
decltype(on_handshake)>(on_handshake), resource, host, me = this->shared_from_this()](
auto ec)
mutable
427 me->on_ready_for_handshake(host, resource, std::forward<
decltype(on_handshake)>(on_handshake));
434 on_ready_for_handshake(host, resource, std::forward<
decltype(on_handshake)>(on_handshake));
438 on_ready_for_handshake(
const std::string& host,
const std::string& resource, concepts::accept_handler
auto&& on_handshake)
440 m_logger->trace(
"on_ready_for_handshake()");
444 m_ws.get_lowest_layer([](
auto& s) { s.expires_never(); });
448 m_ws.async_handshake(
451 std::forward<
decltype(on_handshake)>(on_handshake)
456 on_write(
auto ec,
auto size)
458 m_logger->trace(
"on_write() wrote: '{}' bytes", size);
461 m_logger->error(
"on_write failed for websocket connection: '{}'", ec.message());
469 m_logger->trace(
"on_close()");
471 m_state = state::closed;
void push(act_t act)
Add an action to the queue.
Definition: action_queue.hpp:48
Definition: request.hpp:19
Represents a connection via the WebSocket protocol.
Definition: connection.hpp:37
static std::shared_ptr< connection > make(const std::shared_ptr< spdlog::logger > logger, stream &&ws, const std::string &agent_string)
Construct a new connection object.
Definition: connection.hpp:106
void send(const concepts::const_buffer_sequence auto &payload, Callback &&done)
Send the contents of a buffer to the client.
Definition: connection.hpp:301
state
Definition: connection.hpp:48
void force_disconnect(boost::beast::websocket::close_reason why=boost::beast::websocket::normal)
Same as disconnect, but bypasses all queues and runs immediately.
Definition: connection.hpp:241
void set_binary(const bool enabled)
Definition: connection.hpp:83
void read(concepts::dynamic_buffer auto &buff, concepts::async_read_handler auto &&done)
Read a complete message into a buffer.
Definition: connection.hpp:267
bool binary()
Definition: connection.hpp:93
std::shared_ptr< spdlog::logger > logger() const noexcept
Definition: connection.hpp:74
void accept(const boost::beast::http::request< Body, Fields > &req, Callback &&done)
Accept an incoming connection.
Definition: connection.hpp:171
void connect(const boost::asio::ip::tcp::resolver::results_type &target, const std::string &resource, Callback &&done)
Connect to a remote (websocket) endpoint.
Definition: connection.hpp:130
virtual ~connection() noexcept
Definition: connection.hpp:60
void disconnect(boost::beast::websocket::close_reason why=boost::beast::websocket::normal)
Disconnect/stop/close the connection.
Definition: connection.hpp:211
Websocket stream. May use TLS.
Definition: stream.hpp:50
bool binary() const
Checks whether outgoing messages will be indicated as text or binary.
Definition: stream.hpp:179
void get_lowest_layer(Func &&visitor)
Access get_lowest_layer of wrapped stream type.
Definition: stream.hpp:200
void set_binary(const bool enabled)
Controls whether outgoing message will be indicated text or binary.
Definition: stream.hpp:162
auto get_executor()
Get executor of the underlying stream.
Definition: stream.hpp:211
Definition: type_traits.hpp:44
Definition: type_traits.hpp:35
Definition: type_traits.hpp:41
Definition: connection.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9