Malloy
Loading...
Searching...
No Matches
stream.hpp
1#pragma once
2
3#include "../tcp/stream.hpp"
4#include "../type_traits.hpp"
5
6#include <boost/beast/core.hpp>
7#include <boost/asio/connect.hpp>
8#include <boost/asio/ip/tcp.hpp>
9#include <boost/beast/websocket/stream.hpp>
10#include <boost/beast/websocket/error.hpp>
11
12#if MALLOY_FEATURE_TLS
13 #include <boost/beast/ssl.hpp>
14 #include <boost/asio/ssl/stream.hpp>
15#endif
16
17#include <variant>
18
19namespace malloy::websocket
20{
21
22 namespace detail
23 {
24
25#if MALLOY_FEATURE_TLS
26 using tls_stream = boost::beast::websocket::stream<
27 boost::beast::ssl_stream<malloy::tcp::stream<>>
28 >;
29#endif
30 using websocket_t = std::variant<
31#if MALLOY_FEATURE_TLS
32 tls_stream,
33#endif
34 boost::beast::websocket::stream<malloy::tcp::stream<>>
35 >;
36 template<typename T>
37 concept rw_completion_token = boost::asio::completion_token_for<T, void(malloy::error_code, std::size_t)>;
38 }
39
49 class stream
50 {
51 using ws_t = detail::websocket_t;
52
53 public:
54 explicit
55 stream(detail::websocket_t&& ws) :
56 m_underlying_conn{ std::move(ws) }
57 {
58 }
59
60 explicit
61 stream(boost::beast::websocket::stream<malloy::tcp::stream<>>&& s) :
62 m_underlying_conn{ std::move(s) }
63 {
64 }
65
66 explicit
68 stream{boost::beast::websocket::stream<malloy::tcp::stream<>>{std::move(from)}}
69 {
70 }
71
72#if MALLOY_FEATURE_TLS
73 explicit
74 stream(detail::tls_stream&& ws) :
75 m_underlying_conn{std::move(ws)}
76 {
77 }
78
79 explicit
80 stream(boost::beast::ssl_stream<malloy::tcp::stream<>>&& from) :
81 stream{malloy::websocket::detail::tls_stream{
82 boost::beast::websocket::stream<
83 boost::beast::ssl_stream<malloy::tcp::stream<>>
84 >{std::move(from)}}}
85 {
86 }
87#endif
88
89 template<concepts::const_buffer_sequence Buff, detail::rw_completion_token Callback>
90 auto
91 async_write(const Buff& buffers, Callback&& done)
92 {
93 return std::visit([&buffers, done = std::forward<Callback>(done)](auto& stream) mutable { return stream.async_write(buffers, std::forward<Callback>(done)); }, m_underlying_conn);
94 }
95
96 template<concepts::const_buffer_sequence Buff>
97 auto
98 write(const Buff& buffers) -> std::size_t
99 {
100 return std::visit([&buffers](auto& stream) mutable { return stream.write(buffers); }, m_underlying_conn);
101 }
102
103 template<concepts::dynamic_buffer Buff, detail::rw_completion_token Callback>
104 auto
105 async_read(Buff& buff, Callback&& done)
106 {
107 return std::visit([&buff, done = std::forward<Callback>(done)](auto& s) mutable { return s.async_read(buff, std::forward<Callback>(done)); }, m_underlying_conn);
108 }
109
110 template<concepts::dynamic_buffer Buff>
111 auto
112 read(Buff& buff, boost::beast::error_code& ec) -> std::size_t
113 {
114 return std::visit([&buff, &ec](auto& s) mutable { return s.read(buff, ec); }, m_underlying_conn);
115 }
116
117 template<boost::asio::completion_token_for<void(malloy::error_code)> CompletionToken>
118 auto
119 async_close(boost::beast::websocket::close_reason why, CompletionToken&& done) {
120 return std::visit([why, done = std::forward<CompletionToken>(done)](auto& s) mutable {
121 return s.async_close(why, std::forward<CompletionToken>(done));
122 }, m_underlying_conn);
123 }
124
125 void
126 set_option(auto&& opt)
127 {
128 std::visit([opt = std::forward<decltype(opt)>(opt)](auto& s) mutable { s.set_option(std::forward<decltype(opt)>(opt)); }, m_underlying_conn);
129 }
130
131 template<typename Body, typename Fields, boost::asio::completion_token_for<void(malloy::error_code)> CompletionHandler>
132 auto
133 async_accept(const boost::beast::http::request<Body, Fields>& req, CompletionHandler&& done)
134 {
135 return std::visit([req, done = std::forward<decltype(done)>(done)](auto& s) mutable { return s.async_accept(req, std::forward<decltype(done)>(done)); }, m_underlying_conn);
136 }
137
138 template<typename Body, typename Fields>
139 auto
140 accept(const boost::beast::http::request<Body, Fields>& req) -> boost::beast::error_code
141 {
142 return std::visit([req](auto& s) { return s.accept(req); }, m_underlying_conn);
143 }
144
145 template<boost::asio::completion_token_for<void(malloy::error_code)> Callback>
146 auto
147 async_handshake(std::string host, std::string target, Callback&& done)
148 {
149 return std::visit([host = std::move(host), target = std::move(target), done = std::forward<Callback>(done)](auto& s) mutable {
150 return s.async_handshake(host, target, std::forward<Callback>(done));
151 }, m_underlying_conn);
152 }
153
161 void
162 set_binary(const bool enabled)
163 {
164 std::visit([enabled](auto& s) mutable {
165 s.binary(enabled);
166 },
167 m_underlying_conn
168 );
169 }
170
177 [[nodiscard]]
178 bool
179 binary() const
180 {
181 return std::visit([](auto& s) {
182 return s.binary();
183 },
184 m_underlying_conn
185 );
186 }
187
198 template<typename Func>
199 void
200 get_lowest_layer(Func&& visitor)
201 {
202 std::visit([visitor = std::forward<Func>(visitor)](auto& s) mutable { visitor(boost::beast::get_lowest_layer(s)); }, m_underlying_conn);
203 }
204
210 auto
212 {
213 return std::visit([](auto& s) { return s.get_executor(); }, m_underlying_conn);
214 }
215
222 bool
223 is_open() const
224 {
225 return std::visit([](auto& s){ return s.is_open(); }, m_underlying_conn);
226 }
227
232 constexpr
233 bool
234 is_tls() const
235 {
236#if MALLOY_FEATURE_TLS
237 return std::holds_alternative<detail::tls_stream>(m_underlying_conn);
238#else
239 return false;
240#endif
241 }
242
243#if MALLOY_FEATURE_TLS
244 template<concepts::accept_handler Callback>
245 void
246 async_handshake_tls(boost::asio::ssl::stream_base::handshake_type type, Callback&& done)
247 {
248 if (!is_tls())
249 throw std::logic_error{"async_handshake_tls called on non-tls stream"};
250
251 std::visit(
252 [done = std::forward<Callback>(done), type](auto& s) mutable {
253 if constexpr (std::same_as<std::decay_t<decltype(s)>, detail::tls_stream>)
254 s.next_layer().async_handshake(type, std::forward<Callback>(done));
255 },
256 m_underlying_conn
257 );
258 }
259 #endif
260
261 private:
262 ws_t m_underlying_conn;
263 };
264
265}
Websocket stream. May use TLS.
Definition: stream.hpp:50
bool binary() const
Checks whether outgoing messages will be indicated as text or binary.
Definition: stream.hpp:179
void get_lowest_layer(Func &&visitor)
Access get_lowest_layer of wrapped stream type.
Definition: stream.hpp:200
void set_binary(const bool enabled)
Controls whether outgoing message will be indicated text or binary.
Definition: stream.hpp:162
bool is_open() const
Returns true if the stream is open.
Definition: stream.hpp:223
auto get_executor()
Get executor of the underlying stream.
Definition: stream.hpp:211
constexpr bool is_tls() const
Whether the underlying stream is TLS or not.
Definition: stream.hpp:234
boost::beast::basic_stream< boost::asio::ip::tcp, boost::asio::any_io_executor, RatePolicy > stream
Definition: stream.hpp:22
Definition: connection.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9