#pragma once #include #include #include #include #include #include namespace _details { template using optional = std::optional; //< Configuration constant on which optional to use to implement the rest of this file auto nullopt = std::nullopt; //< Configuration constant on which optional to use to implement the rest of this file struct awaiter { void operator()(int duration) { std::this_thread::sleep_for(std::chrono::milliseconds(duration)); } }; struct defer_impl { std::function fn; ~defer_impl() { fn(); } }; #ifdef defer #warning "A defer macro is already defined and will be undefined after this file" #undef defer #endif #define defer(name, h) _details::defer_impl _defer_slice ## name {[&](){(h);}}; struct _do_not_use_t { }; constexpr _do_not_use_t do_not_use{}; } namespace container { template class circular_buffer_iterator { struct M { T *buffer; size_t capacity; size_t index; }; M m; explicit circular_buffer_iterator(M members) : m{members} {} public: using difference_type = size_t; using value_type = T; using pointer = T *; using reference = T &; using iterator_category = std::bidirectional_iterator_tag; circular_buffer_iterator(T *buffer, size_t capacity, size_t index) : m{M{ .buffer = buffer, .capacity = capacity, .index = index }} {} circular_buffer_iterator &operator++() { m.index = (m.index + 1) bitand (m.capacity - 1); return *this; } circular_buffer_iterator operator++(int) { M oth = m; m.index = (m.index + 1) bitand (m.capacity - 1); return circular_buffer_iterator{oth}; } circular_buffer_iterator &operator--() { m.index = (m.index - 1) bitand (m.capacity - 1); return *this; } circular_buffer_iterator operator--(int) { M oth = m; m.index = (m.index - 1) bitand (m.capacity - 1); return circular_buffer_iterator{oth}; } T &operator*() { return m.buffer[m.index]; } bool operator==(const circular_buffer_iterator &oth) { return m == oth.m; } bool operator!=(const circular_buffer_iterator &oth) { return m.buffer != oth.m.buffer or m.index != oth.m.index or m.capacity != oth.m.capacity; } }; template class circular_buffer { struct M { std::unique_ptr buffer; size_t capacity; size_t start; size_t end; }; M m; M null_members() { return M{ .buffer = nullptr, .capacity = 0, .start = 0, .end = 0 }; } static size_t validate_capacity(size_t new_capacity) { if (new_capacity < 2) return 2; if ((new_capacity bitand (new_capacity - 1))) { new_capacity = new_capacity & (new_capacity >> 1); new_capacity = new_capacity & (new_capacity >> 2); new_capacity = new_capacity & (new_capacity >> 4); new_capacity = new_capacity & (new_capacity >> 8); new_capacity = new_capacity & (new_capacity >> 16); new_capacity = new_capacity & (new_capacity >> 31); return new_capacity + 1; } return new_capacity; } explicit circular_buffer(M s) : m(std::move(s)) {} public: circular_buffer() : m{null_members()} {} circular_buffer(const circular_buffer &oth) : m(null_members()) { if (oth.m.capacity == 0) return; reserve(oth.m.capacity); for (const auto &elem: oth) { push_back(elem); } } circular_buffer(circular_buffer &&oth) noexcept : m{std::exchange(oth.m, null_members())} {} circular_buffer &operator=(const circular_buffer &oth) { if (this == &oth) return *this; while (pop_front()); reserve(oth.m.capacity); for (const auto &elem: oth) { push_back(elem); } }; circular_buffer &operator=(circular_buffer &&oth) noexcept { std::swap(m, oth.m); } circular_buffer_iterator begin() { return circular_buffer_iterator(reinterpret_cast(m.buffer.get()), m.capacity, m.start); } circular_buffer_iterator end() { return circular_buffer_iterator(reinterpret_cast(m.buffer.get()), m.capacity, m.end); } circular_buffer_iterator begin() const { return circular_buffer_iterator{ m.buffer.get(), m.capacity, m.start }; } circular_buffer_iterator end() const { return circular_buffer_iterator{ m.buffer.get(), m.capacity, m.end }; } void reserve(size_t new_capacity) { new_capacity = validate_capacity(new_capacity); if (new_capacity <= m.capacity) return; circular_buffer temp{M{ .buffer = std::make_unique(new_capacity * sizeof(T)), .capacity = new_capacity, .start = 0, .end = 0 }}; _details::optional value = pop_front(); while (value) { temp.push_back(std::move(value.value())); value = pop_front(); } std::swap(m, temp.m); } void push_front(T value) { if (m.capacity == 0) reserve(2); size_t new_start = (m.start - 1) bitand (m.capacity - 1); if (new_start == m.end) { reserve(m.capacity << 1); } new_start = (m.start - 1) bitand (m.capacity - 1); ::new(reinterpret_cast(m.buffer.get()) + new_start) T(value); m.start = new_start; } _details::optional pop_front() noexcept { if (m.start == m.end) return _details::nullopt; _details::optional ret = std::move(*begin()); (*begin()).~T(); m.start = (m.start + 1) bitand (m.capacity - 1); return ret; } void push_back(T value) { if (m.capacity == 0) reserve(2); size_t new_end = (m.end + 1) bitand (m.capacity - 1); if (new_end == m.start) { reserve(m.capacity << 1); } new_end = (m.end + 1) bitand (m.capacity - 1); ::new(reinterpret_cast(m.buffer.get()) + m.end) T(value); m.end = new_end; } _details::optional pop_back() noexcept { if (m.start == m.end) return _details::nullopt; auto last = --end(); _details::optional ret = std::move(*last); (*last).~T(); m.end = (m.end - 1) bitand (m.capacity - 1); return ret; } bool empty() const { return m.start == m.end; } ~circular_buffer() { while (not empty()) { pop_front(); } } }; } namespace _details { template class channel_impl { mutable std::mutex lock; container::circular_buffer data; int active_writer = 0; int active_reader = 0; public: struct reader_t{}; struct writer_t{}; static constexpr reader_t reader{}; static constexpr writer_t writer{}; void close(reader_t) { std::lock_guard d{lock}; active_reader -= 1; } void close(writer_t) { std::lock_guard d{lock}; active_writer -=1; } void register_one(reader_t, _details::_do_not_use_t) { std::lock_guard d{lock}; active_reader += 1; } void register_one(writer_t, _details::_do_not_use_t) { std::lock_guard d{lock}; active_writer += 1; } bool closed(reader_t) { std::lock_guard d{lock}; return active_reader == 0; } bool closed(writer_t) { std::lock_guard d{lock}; return active_writer == 0; } bool push(const T& elem) { std::lock_guard d{lock}; if(active_reader == 0) return true; if(active_writer == 0) return false; data.push_front(elem); return true; } bool push(T&& elem) { std::lock_guard d{lock}; if(active_reader == 0) return true; if(active_writer == 0) return false; data.push_front(elem); return true; } _details::optional pop() { std::lock_guard d{lock}; if(active_reader == 0) return _details::nullopt; return data.pop_back(); } bool empty() const { return active_writer == 0 && data.empty(); } }; } /** * This is a read only channel of its specified template parameter type * @tparam T The type of element that is transmitted through the channel */ template class channel_r { std::shared_ptr<_details::channel_impl> impl; public: /** * This constructor should **NOT** be used */ channel_r(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl> _impl) : impl(_impl) { impl->register_one(_details::channel_impl::reader, _details::do_not_use); } /** * Copy constructor for channel receiver */ channel_r(const channel_r& oth) { if(oth.impl.get() == impl.get()) return; impl = oth.impl; if(impl) impl->register_one(_details::channel_impl::reader, _details::do_not_use); } /** * Obtains a value from the queue * @return either a value if one is available in the queue, or no value */ _details::optional pop() { return impl->pop(); } /** * Verifies if the queue is empty and has no sender available * @return true if the queue will never contain new elements * @return false if the queue may at some point in the future contain new elements */ bool empty() const { return impl->empty(); } ~channel_r() { impl->close(_details::channel_impl::reader); } /** * Verifies if the writer side is still open * @return true if the writer side is closed * @return false if the writer side is still open */ bool closed() { return impl->closed(_details::channel_impl::writer); } }; /** * This is a write only channel of its specified template parameter type * @tparam T The type of element that is transmitted through the channel */ template class channel_w { std::shared_ptr<_details::channel_impl> impl; public: /** * This constructor should **NOT** be used */ channel_w(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl> _impl) : impl(_impl) { impl->register_one(_details::channel_impl::writer, _details::do_not_use); } /** * Copy constructor for channel sender */ channel_w(const channel_w& oth) { if(oth.impl.get() == impl.get()) return; impl = oth.impl; if(impl) impl->register_one(_details::channel_impl::writer, _details::do_not_use); } /** * Sends an element through the channel * @param elem the element to send * @return true is the element was successfully pushed */ bool push(const T& elem) { return impl->push(elem); } /** * Sends an element through the channel * @param elem the element to send * @return true is the element was successfully pushed */ bool push(T&& elem) { return impl->push(std::forward(elem)); } ~channel_w() { impl->close(_details::channel_impl::writer); } /** * Verifies if the reader side is still open * @return true if the reader side is closed * @return false if the reader side is still open */ bool closed() { return impl->closed(_details::channel_impl::reader); } }; /** * Creates a pipe made of a reader channel and a writer channel allowing to transmit objects of the specified type between units of concurrency * @tparam T The type to allow transfers of * @return a pair `[writer, reader]` that are connected */ template std::pair, channel_r> make_pipe() { auto shared_ch = std::make_shared<_details::channel_impl>(); return {{_details::do_not_use, shared_ch}, {_details::do_not_use, shared_ch}}; } template class promise; /** * This class allows instantiation and manipulation of the receiving side of a promise * @tparam T The type that will be received * @tparam awaiter A type that has an `operator()(int)` that represent how this object must wait */ template class future { channel_r conduit; friend class promise; future(channel_r&& c) : conduit(c) {}; public: /** * Waits for the future to be set */ void wait() { for(;;) { if (conduit.closed()) return; awaiter{}(30); } } /** * Obtains the received value. * * @note This method should never be called before wait, and should not be called more than once * @return */ _details::optional get() { return conduit.pop(); } }; /** * This class allows to represent waiting for a value from one unit of concurrency into another one. * @tparam T The type of the value that will be transmitted * @tparam awaiter A type that has an `operator()(int)` that represent how `future`s created by this object must wait */ template class promise { _details::optional> conduit; _details::optional> future_conduit; public: /** * Initializes a valid promise */ promise() { auto pipe = make_pipe(); conduit = std::move(pipe.first); future_conduit = std::move(pipe.second); } /** * Obtains the receiving end of this promise * * @note This method should never be called more than once * @return A future that can be awaited from */ future get_future() { defer(cleanup, future_conduit = _details::nullopt); return future { std::move(future_conduit.value()) }; } /** * Sets the value of the future to the provided value * * @note This method and methods sharing the same name should never be called more than once * @param value The value to transmit */ void set_value(T&& value) { conduit.value().push(std::forward(value)); conduit = _details::nullopt; } /** * Sets the value of the future to the provided value * * @note This method and methods sharing the same name should never be called more than once * @param value The value to transmit */ void set_value(const T& value) { conduit.value().push(value); conduit = _details::nullopt; } }; #undef defer