Malloy
Loading...
Searching...
No Matches
connection.hpp
1#pragma once
2
3#include "request_result.hpp"
4#include "../type_traits.hpp"
5#include "../../core/mp.hpp"
6#include "../../core/type_traits.hpp"
7#include "../../core/http/request.hpp"
8#include "../../core/http/response.hpp"
9#include "../../core/http/type_traits.hpp"
10
11#include <boost/asio/strand.hpp>
12#include <boost/asio.hpp>
13#include <boost/beast/core.hpp>
14#include <spdlog/logger.h>
15
16#include <coroutine>
17#include <future>
18#include <optional>
19
20namespace malloy::client::http
21{
22
28 template<
29 class Derived,
31 concepts::response_filter Filter
32 >
34 {
35 public:
36 connection(std::shared_ptr<spdlog::logger> logger, const std::uint64_t body_limit) :
37 m_logger(std::move(logger))
38 {
39 // Sanity check
40 if (!m_logger)
41 throw std::invalid_argument("no valid logger provided.");
42
43 // Set body limit
44 m_parser.body_limit(body_limit);
45 }
46
47 // Note: We pass by copy as we need to make sure that these members stay alive until the coroutine co_returns.
48 // The consuming application passes in the request and filter via malloy::client::controller::http_request(). If those callables
49 // are for example a lambda, they are being destroyed once the http_request() function returns. http_request() will actually return
50 // before the execution of this run() function completed.
51 // ToDo: Don't deal with ec1, ec2, ec3 and so on. Reuse the same ec instance each time.
52 boost::asio::awaitable< request_result<Filter> >
53 run(
55 Filter filter
56 )
57 {
58 // Get the executor
59 auto executor = co_await boost::asio::this_coro::executor;
60
61 // Look up the domain name
62 auto resolver = boost::asio::ip::tcp::resolver{ executor };
63 const auto [ec1, results] = co_await resolver.async_resolve(
64 req.base()[malloy::http::field::host],
65 std::to_string(req.port()),
66 boost::asio::as_tuple(boost::asio::use_awaitable)
67 );
68 if (ec1)
69 co_return request_result<Filter>{ ec1 };
70
71 // Make the connection on the IP address we get from a lookup
73 set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
74 co_await boost::beast::get_lowest_layer(derived().stream()).async_connect(results, boost::asio::redirect_error(boost::asio::use_awaitable, ec2));
75 if (ec2)
76 co_return request_result<Filter>{ ec2 };
77
78 // Call "connected" hook
79 co_await derived().hook_connected();
80
81 // Send the HTTP request to the remote host
82 set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
83 co_await boost::beast::http::async_write(derived().stream(), req);
84
85 // Pick a body and parse it from the stream
86 // ToDo: Have a look at using boost::beast::http::response<boost::beast::http::dynamic_body> instead!
87 // This would probably involve something like:
88 //
89 // boost::beast::flat_buffer buffer;
90 // http::response<http::dynamic_body> res;
91 // co_await http::async_read(derived().stream(), buffer, res);
92 //
93 auto bodies = filter.body_for(m_parser.get().base());
94 auto resp = co_await std::visit(
95 [&filter, this](auto&& body) -> boost::asio::awaitable< malloy::mp::filter_resp_t<Filter> > {
96 using body_t = std::decay_t<decltype(body)>;
97
98 auto parser = std::make_shared<boost::beast::http::response_parser<body_t>>(std::move(m_parser));
99 filter.setup_body(parser->get().base(), parser->get().body());
100
101 boost::beast::flat_buffer buffer;
102
103 co_await boost::beast::http::async_read(
104 derived().stream(),
105 buffer,
106 *parser,
107 boost::asio::use_awaitable
108 );
109
110 co_return malloy::http::response<body_t>{ parser->release() };
111 },
112 std::move(bodies)
113 );
114
115 // Gracefully close the socket
117 set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
118 // ToDo: This should be co_await too!
119 boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
120
121 // not_connected happens sometimes
122 // so don't bother reporting it.
123 //
124 if (ec && ec != boost::beast::errc::not_connected)
125 m_logger->error("shutdown: {}", ec.message());
126
127 // If we get here then the connection is closed gracefully
128
129 // Prepare the result
130 // ToDo: Optimize this by in-place construction or something like that?
132 result.error_code = std::move(ec);
133 result.response = std::move(resp);
134
135 co_return result;
136 }
137
138 protected:
139 std::shared_ptr<spdlog::logger> m_logger;
140
141 template<typename Rep, typename Period>
142 void
143 set_stream_timeout(const std::chrono::duration<Rep, Period> duration)
144 {
145 // ToDo: Which one is correct?!?!
146
147 //derived().stream().expires_after(duration);
148 boost::beast::get_lowest_layer(derived().stream()).expires_after(duration);
149 }
150
151 private:
152 boost::beast::http::response_parser<boost::beast::http::empty_body> m_parser;
153
154 [[nodiscard]]
155 constexpr
156 Derived&
157 derived() noexcept
158 {
159 return static_cast<Derived&>(*this);
160 }
161 };
162
163}
Definition: connection.hpp:34
Definition: request.hpp:19
constexpr std::uint16_t port() const noexcept
Definition: request.hpp:143
Definition: response.hpp:22
Definition: type_traits.hpp:9
unwrap_variant< to_responses< bodies_for_t< Filter > > > filter_resp_t
Resolves to the type that must be taken in callbacks handling responses for Filter.
Definition: mp.hpp:67
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9
Definition: request_result.hpp:16
malloy::mp::filter_resp_t< Filter > response
Definition: request_result.hpp:30
malloy::error_code error_code
Definition: request_result.hpp:20