From 3679905baa520ce49ef312a8972c2eebe08842e8 Mon Sep 17 00:00:00 2001 From: Archivist Date: Tue, 23 Jul 2024 13:59:37 +0000 Subject: [PATCH] Add 'channels.hpp' --- channels.hpp | 534 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 534 insertions(+) create mode 100644 channels.hpp diff --git a/channels.hpp b/channels.hpp new file mode 100644 index 0000000..d7051f9 --- /dev/null +++ b/channels.hpp @@ -0,0 +1,534 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +namespace _details { + template + using optional = std::optional; //< Configuration constant on which optional to use to implement the rest of this file + auto nullopt = std::nullopt; //< Configuration constant on which optional to use to implement the rest of this file + + struct awaiter { + void operator()(int duration) { + std::this_thread::sleep_for(std::chrono::milliseconds(duration)); + } + }; + + struct defer_impl { + std::function fn; + + ~defer_impl() { + fn(); + } + }; + +#ifdef defer +#warning "A defer macro is already defined and will be undefined after this file" +#undef defer +#endif +#define defer(name, h) _details::defer_impl _defer_slice ## name {[&](){(h);}}; + + struct _do_not_use_t { + }; + constexpr _do_not_use_t do_not_use{}; +} + +namespace container { + + template + class circular_buffer_iterator { + struct M { + T *buffer; + size_t capacity; + size_t index; + }; + + M m; + + explicit circular_buffer_iterator(M members) + : m{members} {} + + public: + using difference_type = size_t; + using value_type = T; + using pointer = T *; + using reference = T &; + using iterator_category = std::bidirectional_iterator_tag; + + + circular_buffer_iterator(T *buffer, size_t capacity, size_t index) + : m{M{ + .buffer = buffer, + .capacity = capacity, + .index = index + }} {} + + circular_buffer_iterator &operator++() { + m.index = (m.index + 1) bitand (m.capacity - 1); + return *this; + } + + circular_buffer_iterator operator++(int) { + M oth = m; + m.index = (m.index + 1) bitand (m.capacity - 1); + return circular_buffer_iterator{oth}; + } + + circular_buffer_iterator &operator--() { + m.index = (m.index - 1) bitand (m.capacity - 1); + return *this; + } + + circular_buffer_iterator operator--(int) { + M oth = m; + m.index = (m.index - 1) bitand (m.capacity - 1); + return circular_buffer_iterator{oth}; + } + + T &operator*() { + return m.buffer[m.index]; + } + + bool operator==(const circular_buffer_iterator &oth) { + return m == oth.m; + } + + bool operator!=(const circular_buffer_iterator &oth) { + return m.buffer != oth.m.buffer or m.index != oth.m.index or m.capacity != oth.m.capacity; + } + }; + + template + class circular_buffer { + struct M { + std::unique_ptr buffer; + size_t capacity; + size_t start; + size_t end; + }; + + M m; + + M null_members() { + return M{ + .buffer = nullptr, + .capacity = 0, + .start = 0, + .end = 0 + }; + } + + static size_t validate_capacity(size_t new_capacity) { + if (new_capacity < 2) return 2; + if ((new_capacity bitand (new_capacity - 1))) { + new_capacity = new_capacity & (new_capacity >> 1); + new_capacity = new_capacity & (new_capacity >> 2); + new_capacity = new_capacity & (new_capacity >> 4); + new_capacity = new_capacity & (new_capacity >> 8); + new_capacity = new_capacity & (new_capacity >> 16); + new_capacity = new_capacity & (new_capacity >> 31); + return new_capacity + 1; + } + return new_capacity; + } + + explicit circular_buffer(M s) : m(std::move(s)) {} + + public: + circular_buffer() + : m{null_members()} {} + + circular_buffer(const circular_buffer &oth) + : m(null_members()) { + if (oth.m.capacity == 0) return; + reserve(oth.m.capacity); + for (const auto &elem: oth) { + push_back(elem); + } + } + + circular_buffer(circular_buffer &&oth) noexcept + : m{std::exchange(oth.m, null_members())} {} + + circular_buffer &operator=(const circular_buffer &oth) { + if (this == &oth) return *this; + while (pop_front()); + reserve(oth.m.capacity); + for (const auto &elem: oth) { + push_back(elem); + } + }; + + circular_buffer &operator=(circular_buffer &&oth) noexcept { + std::swap(m, oth.m); + } + + circular_buffer_iterator begin() { + return circular_buffer_iterator(reinterpret_cast(m.buffer.get()), m.capacity, m.start); + } + + circular_buffer_iterator end() { + return circular_buffer_iterator(reinterpret_cast(m.buffer.get()), m.capacity, m.end); + } + + circular_buffer_iterator begin() const { + return circular_buffer_iterator{ + m.buffer.get(), m.capacity, m.start + }; + } + + circular_buffer_iterator end() const { + return circular_buffer_iterator{ + m.buffer.get(), m.capacity, m.end + }; + } + + void reserve(size_t new_capacity) { + new_capacity = validate_capacity(new_capacity); + if (new_capacity <= m.capacity) return; + + circular_buffer temp{M{ + .buffer = std::make_unique(new_capacity * sizeof(T)), + .capacity = new_capacity, + .start = 0, + .end = 0 + }}; + + _details::optional value = pop_front(); + while (value) { + temp.push_back(std::move(value.value())); + value = pop_front(); + } + std::swap(m, temp.m); + } + + void push_front(T value) { + if (m.capacity == 0) reserve(2); + size_t new_start = (m.start - 1) bitand (m.capacity - 1); + if (new_start == m.end) { + reserve(m.capacity << 1); + } + new_start = (m.start - 1) bitand (m.capacity - 1); + ::new(reinterpret_cast(m.buffer.get()) + new_start) T(value); + m.start = new_start; + } + + _details::optional pop_front() noexcept { + if (m.start == m.end) return _details::nullopt; + _details::optional ret = std::move(*begin()); + (*begin()).~T(); + m.start = (m.start + 1) bitand (m.capacity - 1); + return ret; + } + + void push_back(T value) { + if (m.capacity == 0) reserve(2); + size_t new_end = (m.end + 1) bitand (m.capacity - 1); + if (new_end == m.start) { + reserve(m.capacity << 1); + } + new_end = (m.end + 1) bitand (m.capacity - 1); + ::new(reinterpret_cast(m.buffer.get()) + m.end) T(value); + m.end = new_end; + } + + _details::optional pop_back() noexcept { + if (m.start == m.end) return _details::nullopt; + auto last = --end(); + _details::optional ret = std::move(*last); + (*last).~T(); + m.end = (m.end - 1) bitand (m.capacity - 1); + return ret; + } + + bool empty() const { + return m.start == m.end; + } + + ~circular_buffer() { + while (not empty()) { + pop_front(); + } + } + }; +} + +namespace _details { + template + class channel_impl { + mutable std::mutex lock; + container::circular_buffer data; + int active_writer = 0; + int active_reader = 0; + public: + struct reader_t{}; + struct writer_t{}; + static constexpr reader_t reader{}; + static constexpr writer_t writer{}; + void close(reader_t) { + std::lock_guard d{lock}; + active_reader -= 1; + } + void close(writer_t) { + std::lock_guard d{lock}; + active_writer -=1; + } + void register_one(reader_t, _details::_do_not_use_t) { + std::lock_guard d{lock}; + active_reader += 1; + } + void register_one(writer_t, _details::_do_not_use_t) { + std::lock_guard d{lock}; + active_writer += 1; + } + bool closed(reader_t) { + std::lock_guard d{lock}; + return active_reader == 0; + } + bool closed(writer_t) { + std::lock_guard d{lock}; + return active_writer == 0; + } + bool push(const T& elem) { + std::lock_guard d{lock}; + if(active_reader == 0) return true; + if(active_writer == 0) return false; + data.push_front(elem); + return true; + } + bool push(T&& elem) { + std::lock_guard d{lock}; + if(active_reader == 0) return true; + if(active_writer == 0) return false; + data.push_front(elem); + return true; + } + _details::optional pop() { + std::lock_guard d{lock}; + if(active_reader == 0) return _details::nullopt; + return data.pop_back(); + } + bool empty() const { + return active_writer == 0 && data.empty(); + } + }; +} + +/** + * This is a read only channel of its specified template parameter type + * @tparam T The type of element that is transmitted through the channel + */ +template +class channel_r { + std::shared_ptr<_details::channel_impl> impl; +public: + /** + * This constructor should **NOT** be used + */ + channel_r(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl> _impl) + : impl(_impl) { + impl->register_one(_details::channel_impl::reader, _details::do_not_use); + } + + /** + * Copy constructor for channel receiver + */ + channel_r(const channel_r& oth) { + if(oth.impl.get() == impl.get()) return; + impl = oth.impl; + if(impl) impl->register_one(_details::channel_impl::reader, _details::do_not_use); + } + + /** + * Obtains a value from the queue + * @return either a value if one is available in the queue, or no value + */ + _details::optional pop() { + return impl->pop(); + } + + /** + * Verifies if the queue is empty and has no sender available + * @return true if the queue will never contain new elements + * @return false if the queue may at some point in the future contain new elements + */ + bool empty() const { + return impl->empty(); + } + + ~channel_r() { + impl->close(_details::channel_impl::reader); + } + + /** + * Verifies if the writer side is still open + * @return true if the writer side is closed + * @return false if the writer side is still open + */ + bool closed() { + return impl->closed(_details::channel_impl::writer); + } +}; + +/** + * This is a write only channel of its specified template parameter type + * @tparam T The type of element that is transmitted through the channel + */ +template +class channel_w { + std::shared_ptr<_details::channel_impl> impl; +public: + /** + * This constructor should **NOT** be used + */ + channel_w(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl> _impl) + : impl(_impl) { + impl->register_one(_details::channel_impl::writer, _details::do_not_use); + } + + /** + * Copy constructor for channel sender + */ + channel_w(const channel_w& oth) { + if(oth.impl.get() == impl.get()) return; + impl = oth.impl; + if(impl) impl->register_one(_details::channel_impl::writer, _details::do_not_use); + } + + /** + * Sends an element through the channel + * @param elem the element to send + * @return true is the element was successfully pushed + */ + bool push(const T& elem) { + return impl->push(elem); + } + /** + * Sends an element through the channel + * @param elem the element to send + * @return true is the element was successfully pushed + */ + bool push(T&& elem) { + return impl->push(std::forward(elem)); + } + ~channel_w() { + impl->close(_details::channel_impl::writer); + } + + /** + * Verifies if the reader side is still open + * @return true if the reader side is closed + * @return false if the reader side is still open + */ + bool closed() { + return impl->closed(_details::channel_impl::reader); + } +}; + +/** + * Creates a pipe made of a reader channel and a writer channel allowing to transmit objects of the specified type between units of concurrency + * @tparam T The type to allow transfers of + * @return a pair `[writer, reader]` that are connected + */ +template +std::pair, channel_r> make_pipe() { + auto shared_ch = std::make_shared<_details::channel_impl>(); + return {{_details::do_not_use, shared_ch}, {_details::do_not_use, shared_ch}}; +} + +template +class promise; + +/** + * This class allows instantiation and manipulation of the receiving side of a promise + * @tparam T The type that will be received + * @tparam awaiter A type that has an `operator()(int)` that represent how this object must wait + */ +template +class future { + channel_r conduit; + friend class promise; + + future(channel_r&& c) : conduit(c) {}; + +public: + /** + * Waits for the future to be set + */ + void wait() { + for(;;) { + if (conduit.closed()) return; + awaiter{}(30); + } + } + + /** + * Obtains the received value. + * + * @note This method should never be called before wait, and should not be called more than once + * @return + */ + _details::optional get() { + return conduit.pop(); + } +}; + +/** + * This class allows to represent waiting for a value from one unit of concurrency into another one. + * @tparam T The type of the value that will be transmitted + * @tparam awaiter A type that has an `operator()(int)` that represent how `future`s created by this object must wait + */ +template +class promise { + _details::optional> conduit; + _details::optional> future_conduit; +public: + /** + * Initializes a valid promise + */ + promise() { + auto pipe = make_pipe(); + conduit = std::move(pipe.first); + future_conduit = std::move(pipe.second); + } + + /** + * Obtains the receiving end of this promise + * + * @note This method should never be called more than once + * @return A future that can be awaited from + */ + future get_future() { + defer(cleanup, future_conduit = _details::nullopt); + return future { + std::move(future_conduit.value()) + }; + } + + /** + * Sets the value of the future to the provided value + * + * @note This method and methods sharing the same name should never be called more than once + * @param value The value to transmit + */ + void set_value(T&& value) { + conduit.value().push(std::forward(value)); + conduit = _details::nullopt; + } + + /** + * Sets the value of the future to the provided value + * + * @note This method and methods sharing the same name should never be called more than once + * @param value The value to transmit + */ + void set_value(const T& value) { + conduit.value().push(value); + conduit = _details::nullopt; + } +}; + +#undef defer \ No newline at end of file