3#include "../type_traits.hpp"
4#include "../../core/type_traits.hpp"
5#include "../../core/http/request.hpp"
6#include "../../core/http/response.hpp"
7#include "../../core/http/type_traits.hpp"
9#include <boost/asio/strand.hpp>
10#include <boost/beast/core.hpp>
11#include <spdlog/logger.h>
16namespace malloy::client::http
24 template<
class Derived, malloy::http::concepts::body ReqBody, concepts::response_filter Filter,
typename Callback>
28 using resp_t =
typename Filter::response_type;
29 using callback_t = Callback;
31 connection(std::shared_ptr<spdlog::logger> logger, boost::asio::io_context& io_ctx,
const std::uint64_t body_limit) :
32 m_logger(std::move(logger)),
33 m_resolver(boost::asio::make_strand(io_ctx))
37 throw std::invalid_argument(
"no valid logger provided.");
40 m_parser.body_limit(body_limit);
48 std::promise<malloy::error_code> err_channel,
53 m_req_filter = std::move(filter);
54 m_req = std::move(req);
55 m_err_channel = std::move(err_channel);
56 m_cb.emplace(std::move(cb));
59 m_resolver.async_resolve(
60 m_req.base()[malloy::http::field::host],
62 boost::beast::bind_front_handler(
63 &connection::on_resolve,
64 derived().shared_from_this()
70 std::shared_ptr<spdlog::logger> m_logger;
76 boost::beast::http::async_write(
79 boost::beast::bind_front_handler(
80 &connection::on_write,
81 derived().shared_from_this()
87 boost::asio::ip::tcp::resolver m_resolver;
88 boost::beast::flat_buffer m_buffer;
89 boost::beast::http::response_parser<boost::beast::http::empty_body> m_parser;
91 std::promise<malloy::error_code> m_err_channel;
92 std::optional<callback_t> m_cb;
99 return static_cast<Derived&
>(*this);
103 on_resolve(
const boost::beast::error_code& ec, boost::asio::ip::tcp::resolver::results_type results)
105 m_logger->trace(
"on_resolve()");
108 m_logger->error(
"on_resolve: {}", ec.message());
109 m_err_channel.set_value(ec);
114 boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
117 boost::beast::get_lowest_layer(derived().stream()).async_connect(
119 boost::beast::bind_front_handler(
120 &connection::on_connect,
121 derived().shared_from_this()
127 on_connect(
const boost::beast::error_code& ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type)
129 m_logger->trace(
"on_connect()");
132 m_logger->error(
"on_connect: {}", ec.message());
133 m_err_channel.set_value(ec);
138 boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
141 derived().hook_connected();
145 on_write(
const boost::beast::error_code& ec, [[maybe_unused]]
const std::size_t bytes_transferred)
148 m_logger->error(
"on_write: {}", ec.message());
149 m_err_channel.set_value(ec);
154 boost::beast::http::async_read_header(
158 malloy::bind_front_handler(
159 &connection::on_read_header,
160 derived().shared_from_this()
169 m_logger->error(
"on_read_header: '{}'", ec.message());
170 m_err_channel.set_value(ec);
175 auto bodies = m_req_filter.body_for(m_parser.get().base());
176 std::visit([
this](
auto&& body) {
177 using body_t = std::decay_t<
decltype(body)>;
179 auto parser = std::make_shared<boost::beast::http::response_parser<body_t>>(std::move(m_parser));
180 m_req_filter.setup_body(parser->get().base(), parser->get().body());
182 boost::beast::http::async_read(
186 [
this, parser, me = derived().shared_from_this()](
auto ec,
auto) {
188 m_logger->error(
"on_read(): {}", ec.message());
189 m_err_channel.set_value(ec);
207 boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
210 if (ec && ec != boost::beast::errc::not_connected)
211 m_logger->error(
"shutdown: {}", ec.message());
213 m_err_channel.set_value(ec);
Definition: connection.hpp:26
Definition: request.hpp:19
Definition: response.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9