Malloy
Loading...
Searching...
No Matches
stream.hpp
1#pragma once
2
3#include "../tcp/stream.hpp"
4#include "../type_traits.hpp"
5
6#include <boost/beast/core.hpp>
7#include <boost/asio/connect.hpp>
8#include <boost/asio/ip/tcp.hpp>
9#include <boost/beast/websocket/stream.hpp>
10#include <boost/beast/websocket/error.hpp>
11
12#if MALLOY_FEATURE_TLS
13 #include <boost/beast/ssl.hpp>
14 #include <boost/asio/ssl/stream.hpp>
15#endif
16
17#include <variant>
18
19namespace malloy::websocket
20{
21
22 namespace detail
23 {
24
25#if MALLOY_FEATURE_TLS
26 using tls_stream = boost::beast::websocket::stream<
27 boost::beast::ssl_stream<malloy::tcp::stream<>>
28 >;
29#endif
30 using websocket_t = std::variant<
31#if MALLOY_FEATURE_TLS
32 tls_stream,
33#endif
34 boost::beast::websocket::stream<malloy::tcp::stream<>>
35 >;
36 template<typename T>
37 concept rw_completion_token = boost::asio::completion_token_for<T, void(malloy::error_code, std::size_t)>;
38 }
39
49 class stream
50 {
51 using ws_t = detail::websocket_t;
52
53 public:
54 explicit
55 stream(detail::websocket_t&& ws) :
56 m_underlying_conn{ std::move(ws) }
57 {
58 }
59
60 explicit
61 stream(boost::beast::websocket::stream<malloy::tcp::stream<>>&& s) :
62 m_underlying_conn{ std::move(s) }
63 {
64 }
65
66 explicit
68 stream{boost::beast::websocket::stream<malloy::tcp::stream<>>{std::move(from)}}
69 {
70 }
71
72#if MALLOY_FEATURE_TLS
73 explicit
74 stream(detail::tls_stream&& ws) :
75 m_underlying_conn{std::move(ws)}
76 {
77 }
78
79 explicit
80 stream(boost::beast::ssl_stream<malloy::tcp::stream<>>&& from) :
81 stream{malloy::websocket::detail::tls_stream{
82 boost::beast::websocket::stream<
83 boost::beast::ssl_stream<malloy::tcp::stream<>>
84 >{std::move(from)}}}
85 {
86 }
87#endif
88
89 template<concepts::const_buffer_sequence Buff, detail::rw_completion_token Callback>
90 auto
91 async_write(const Buff& buffers, Callback&& done)
92 {
93 return std::visit(
94 [&buffers, done = std::forward<Callback>(done)](auto& stream) mutable {
95 return stream.async_write(buffers, std::forward<Callback>(done));
96 },
97 m_underlying_conn
98 );
99 }
100
101 template<concepts::const_buffer_sequence Buff>
102 auto
103 write(const Buff& buffers) -> std::size_t
104 {
105 return std::visit(
106 [&buffers](auto& stream) mutable {
107 return stream.write(buffers);
108 },
109 m_underlying_conn
110 );
111 }
112
113 template<concepts::dynamic_buffer Buff, detail::rw_completion_token Callback>
114 auto
115 async_read(Buff& buff, Callback&& done)
116 {
117 return std::visit(
118 [&buff, done = std::forward<Callback>(done)](auto& s) mutable {
119 return s.async_read(buff, std::forward<Callback>(done));
120 },
121 m_underlying_conn
122 );
123 }
124
125 template<concepts::dynamic_buffer Buff>
126 auto
127 read(Buff& buff, boost::beast::error_code& ec) -> std::size_t
128 {
129 return std::visit(
130 [&buff, &ec](auto& s) mutable {
131 return s.read(buff, ec);
132 },
133 m_underlying_conn
134 );
135 }
136
137 template<boost::asio::completion_token_for<void(malloy::error_code)> CompletionToken>
138 auto
139 async_close(boost::beast::websocket::close_reason why, CompletionToken&& done)
140 {
141 return std::visit(
142 [why, done = std::forward<CompletionToken>(done)](auto& s) mutable {
143 return s.async_close(why, std::forward<CompletionToken>(done));
144 },
145 m_underlying_conn
146 );
147 }
148
149 void
150 set_option(auto&& opt)
151 {
152 std::visit(
153 [opt = std::forward<decltype(opt)>(opt)](auto& s) mutable {
154 s.set_option(std::forward<decltype(opt)>(opt));
155 },
156 m_underlying_conn
157 );
158 }
159
160 template<typename Body, typename Fields, boost::asio::completion_token_for<void(malloy::error_code)> CompletionHandler>
161 auto
162 async_accept(const boost::beast::http::request<Body, Fields>& req, CompletionHandler&& done)
163 {
164 return std::visit(
165 [req, done = std::forward<decltype(done)>(done)](auto& s) mutable {
166 return s.async_accept(req, std::forward<decltype(done)>(done));
167 },
168 m_underlying_conn
169 );
170 }
171
172 template<typename Body, typename Fields>
173 auto
174 accept(const boost::beast::http::request<Body, Fields>& req) -> boost::beast::error_code
175 {
176 return std::visit(
177 [req](auto& s) {
178 return s.accept(req);
179 },
180 m_underlying_conn
181 );
182 }
183
184 template<boost::asio::completion_token_for<void(malloy::error_code)> Callback>
185 auto
186 async_handshake(std::string host, std::string target, Callback&& done)
187 {
188 return std::visit(
189 [host = std::move(host), target = std::move(target), done = std::forward<Callback>(done)](auto& s) mutable {
190 return s.async_handshake(host, target, std::forward<Callback>(done));
191 },
192 m_underlying_conn
193 );
194 }
195
203 void
204 set_binary(const bool enabled)
205 {
206 std::visit(
207 [enabled](auto& s) mutable {
208 s.binary(enabled);
209 },
210 m_underlying_conn
211 );
212 }
213
220 [[nodiscard]]
221 bool
222 binary() const
223 {
224 return std::visit(
225 [](auto& s) {
226 return s.binary();
227 },
228 m_underlying_conn
229 );
230 }
231
242 template<typename Func>
243 void
244 get_lowest_layer(Func&& visitor)
245 {
246 std::visit(
247 [visitor = std::forward<Func>(visitor)](auto& s) mutable {
248 visitor(boost::beast::get_lowest_layer(s));
249 },
250 m_underlying_conn
251 );
252 }
253
259 auto
261 {
262 return std::visit(
263 [](auto& s) {
264 return s.get_executor();
265 },
266 m_underlying_conn
267 );
268 }
269
276 bool
277 is_open() const
278 {
279 return std::visit(
280 [](auto& s){
281 return s.is_open();
282 },
283 m_underlying_conn
284 );
285 }
286
291 constexpr
292 bool
293 is_tls() const
294 {
295#if MALLOY_FEATURE_TLS
296 return std::holds_alternative<detail::tls_stream>(m_underlying_conn);
297#else
298 return false;
299#endif
300 }
301
302#if MALLOY_FEATURE_TLS
303 template<concepts::accept_handler Callback>
304 void
305 async_handshake_tls(boost::asio::ssl::stream_base::handshake_type type, Callback&& done)
306 {
307 if (!is_tls())
308 throw std::logic_error{"async_handshake_tls called on non-tls stream"};
309
310 std::visit(
311 [done = std::forward<Callback>(done), type](auto& s) mutable {
312 if constexpr (std::same_as<std::decay_t<decltype(s)>, detail::tls_stream>)
313 s.next_layer().async_handshake(type, std::forward<Callback>(done));
314 },
315 m_underlying_conn
316 );
317 }
318 #endif
319
320 private:
321 ws_t m_underlying_conn;
322 };
323
324}
Websocket stream. May use TLS.
Definition: stream.hpp:50
bool binary() const
Checks whether outgoing messages will be indicated as text or binary.
Definition: stream.hpp:222
void get_lowest_layer(Func &&visitor)
Access get_lowest_layer of wrapped stream type.
Definition: stream.hpp:244
void set_binary(const bool enabled)
Controls whether outgoing message will be indicated text or binary.
Definition: stream.hpp:204
bool is_open() const
Returns true if the stream is open.
Definition: stream.hpp:277
auto get_executor()
Get executor of the underlying stream.
Definition: stream.hpp:260
constexpr bool is_tls() const
Whether the underlying stream is TLS or not.
Definition: stream.hpp:293
boost::beast::basic_stream< boost::asio::ip::tcp, boost::asio::any_io_executor, RatePolicy > stream
Definition: stream.hpp:22
Definition: connection.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9