Malloy
Loading...
Searching...
No Matches
controller_run_result.hpp
1#pragma once
2
3#include <boost/asio/executor_work_guard.hpp>
4#include <boost/asio/io_context.hpp>
5#include <boost/asio/use_future.hpp>
6#include <spdlog/logger.h>
7
8#include <memory>
9#include <thread>
10#include <stdexcept>
11
12namespace malloy::detail
13{
14
19 {
23 std::size_t num_threads = 1;
24
28 std::shared_ptr<spdlog::logger> logger;
29
30 void
31 validate()
32 {
33 if (!logger)
34 throw std::logic_error{"invalid config: logger is null"};
35 else if (num_threads == 0)
36 throw std::logic_error{"invalid config: cannot have 0 threads"};
37 };
38 };
39
40 template<std::movable T>
42 {
43 public:
51 controller_run_result(const controller_config& cfg, T ctrl, std::unique_ptr<boost::asio::io_context> ioc) :
52 m_io_ctx{std::move(ioc)},
53 m_workguard{m_io_ctx->get_executor()},
54 m_ctrl{std::move(ctrl)}
55 {
56 // Create the I/O context threads
57 m_io_threads.reserve(cfg.num_threads);
58 for (std::size_t i = 0; i < cfg.num_threads; i++) {
59 m_io_threads.emplace_back(
60 [m_io_ctx = m_io_ctx.get()] { // We cannot capture `this` as we may be moved from before this executes
61 assert(m_io_ctx);
62 m_io_ctx->run();
63 });
64 }
65
66 // Log
67 cfg.logger->debug("starting i/o context.");
68 }
69
71 controller_run_result(controller_run_result&&) noexcept = default;
72
73 controller_run_result& operator=(const controller_run_result&) = delete;
74 controller_run_result& operator=(controller_run_result&&) noexcept = default;
75
80 {
81 if (!m_io_ctx)
82 return; // We've been moved
83
84 // Stop the `io_context`. This will cause `run()`
85 // to return immediately, eventually destroying the
86 // `io_context` and all of the sockets in it.
87 m_io_ctx->stop();
88
89 // Tell the workguard that we no longer need it's service
90 m_workguard.reset();
91
92 // Join I/O threads
93 for (auto& thread : m_io_threads)
94 thread.join();
95 }
96
100 void
102 {
103 if (!m_io_ctx)
104 throw std::logic_error{"attempt to call run() on moved from run_result_t"};
105
106 m_workguard.reset();
107 m_io_ctx->run();
108 }
109
110 private:
111 using workguard_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
112
113 std::unique_ptr<boost::asio::io_context> m_io_ctx;
114 workguard_t m_workguard;
115 std::vector<std::thread> m_io_threads;
116 T m_ctrl; // This order matters, the T destructor may need access to something related to the i/o-context
117 };
118
119} // namespace malloy::detail
Definition: controller_run_result.hpp:42
controller_run_result(const controller_config &cfg, T ctrl, std::unique_ptr< boost::asio::io_context > ioc)
Definition: controller_run_result.hpp:51
void run()
Block until all queued async actions completed.
Definition: controller_run_result.hpp:101
Definition: controller_run_result.hpp:19
std::shared_ptr< spdlog::logger > logger
Definition: controller_run_result.hpp:28
std::size_t num_threads
Definition: controller_run_result.hpp:23