|
|
- #pragma once
- #include <mutex>
- #include <algorithm>
- #include <functional>
- #include <memory>
- #include <thread>
- #include <optional>
-
- namespace _details {
- template<typename T>
- using optional = std::optional<T>; //< Configuration constant on which optional to use to implement the rest of this file
- auto nullopt = std::nullopt; //< Configuration constant on which optional to use to implement the rest of this file
-
- struct awaiter {
- void operator()(int duration) {
- std::this_thread::sleep_for(std::chrono::milliseconds(duration));
- }
- };
-
- struct defer_impl {
- std::function<void(void)> fn;
-
- ~defer_impl() {
- fn();
- }
- };
-
- #ifdef defer
- #warning "A defer macro is already defined and will be undefined after this file"
- #undef defer
- #endif
- #define defer(name, h) _details::defer_impl _defer_slice ## name {[&](){(h);}};
-
- struct _do_not_use_t {
- };
- constexpr _do_not_use_t do_not_use{};
- }
-
- namespace container {
-
- template<typename T>
- class circular_buffer_iterator {
- struct M {
- T *buffer;
- size_t capacity;
- size_t index;
- };
-
- M m;
-
- explicit circular_buffer_iterator(M members)
- : m{members} {}
-
- public:
- using difference_type = size_t;
- using value_type = T;
- using pointer = T *;
- using reference = T &;
- using iterator_category = std::bidirectional_iterator_tag;
-
-
- circular_buffer_iterator(T *buffer, size_t capacity, size_t index)
- : m{M{
- .buffer = buffer,
- .capacity = capacity,
- .index = index
- }} {}
-
- circular_buffer_iterator &operator++() {
- m.index = (m.index + 1) bitand (m.capacity - 1);
- return *this;
- }
-
- circular_buffer_iterator operator++(int) {
- M oth = m;
- m.index = (m.index + 1) bitand (m.capacity - 1);
- return circular_buffer_iterator{oth};
- }
-
- circular_buffer_iterator &operator--() {
- m.index = (m.index - 1) bitand (m.capacity - 1);
- return *this;
- }
-
- circular_buffer_iterator operator--(int) {
- M oth = m;
- m.index = (m.index - 1) bitand (m.capacity - 1);
- return circular_buffer_iterator{oth};
- }
-
- T &operator*() {
- return m.buffer[m.index];
- }
-
- bool operator==(const circular_buffer_iterator &oth) {
- return m == oth.m;
- }
-
- bool operator!=(const circular_buffer_iterator &oth) {
- return m.buffer != oth.m.buffer or m.index != oth.m.index or m.capacity != oth.m.capacity;
- }
- };
-
- template<typename T>
- class circular_buffer {
- struct M {
- std::unique_ptr<char[]> buffer;
- size_t capacity;
- size_t start;
- size_t end;
- };
-
- M m;
-
- M null_members() {
- return M{
- .buffer = nullptr,
- .capacity = 0,
- .start = 0,
- .end = 0
- };
- }
-
- static size_t validate_capacity(size_t new_capacity) {
- if (new_capacity < 2) return 2;
- if ((new_capacity bitand (new_capacity - 1))) {
- new_capacity = new_capacity & (new_capacity >> 1);
- new_capacity = new_capacity & (new_capacity >> 2);
- new_capacity = new_capacity & (new_capacity >> 4);
- new_capacity = new_capacity & (new_capacity >> 8);
- new_capacity = new_capacity & (new_capacity >> 16);
- new_capacity = new_capacity & (new_capacity >> 31);
- return new_capacity + 1;
- }
- return new_capacity;
- }
-
- explicit circular_buffer(M s) : m(std::move(s)) {}
-
- public:
- circular_buffer()
- : m{null_members()} {}
-
- circular_buffer(const circular_buffer &oth)
- : m(null_members()) {
- if (oth.m.capacity == 0) return;
- reserve(oth.m.capacity);
- for (const auto &elem: oth) {
- push_back(elem);
- }
- }
-
- circular_buffer(circular_buffer &&oth) noexcept
- : m{std::exchange(oth.m, null_members())} {}
-
- circular_buffer &operator=(const circular_buffer &oth) {
- if (this == &oth) return *this;
- while (pop_front());
- reserve(oth.m.capacity);
- for (const auto &elem: oth) {
- push_back(elem);
- }
- };
-
- circular_buffer &operator=(circular_buffer &&oth) noexcept {
- std::swap(m, oth.m);
- }
-
- circular_buffer_iterator<T> begin() {
- return circular_buffer_iterator<T>(reinterpret_cast<T *>(m.buffer.get()), m.capacity, m.start);
- }
-
- circular_buffer_iterator<T> end() {
- return circular_buffer_iterator<T>(reinterpret_cast<T *>(m.buffer.get()), m.capacity, m.end);
- }
-
- circular_buffer_iterator<const T> begin() const {
- return circular_buffer_iterator<const T>{
- m.buffer.get(), m.capacity, m.start
- };
- }
-
- circular_buffer_iterator<const T> end() const {
- return circular_buffer_iterator<const T>{
- m.buffer.get(), m.capacity, m.end
- };
- }
-
- void reserve(size_t new_capacity) {
- new_capacity = validate_capacity(new_capacity);
- if (new_capacity <= m.capacity) return;
-
- circular_buffer temp{M{
- .buffer = std::make_unique<char[]>(new_capacity * sizeof(T)),
- .capacity = new_capacity,
- .start = 0,
- .end = 0
- }};
-
- _details::optional <T> value = pop_front();
- while (value) {
- temp.push_back(std::move(value.value()));
- value = pop_front();
- }
- std::swap(m, temp.m);
- }
-
- void push_front(T value) {
- if (m.capacity == 0) reserve(2);
- size_t new_start = (m.start - 1) bitand (m.capacity - 1);
- if (new_start == m.end) {
- reserve(m.capacity << 1);
- }
- new_start = (m.start - 1) bitand (m.capacity - 1);
- ::new(reinterpret_cast<T *>(m.buffer.get()) + new_start) T(value);
- m.start = new_start;
- }
-
- _details::optional <T> pop_front() noexcept {
- if (m.start == m.end) return _details::nullopt;
- _details::optional <T> ret = std::move(*begin());
- (*begin()).~T();
- m.start = (m.start + 1) bitand (m.capacity - 1);
- return ret;
- }
-
- void push_back(T value) {
- if (m.capacity == 0) reserve(2);
- size_t new_end = (m.end + 1) bitand (m.capacity - 1);
- if (new_end == m.start) {
- reserve(m.capacity << 1);
- }
- new_end = (m.end + 1) bitand (m.capacity - 1);
- ::new(reinterpret_cast<T *>(m.buffer.get()) + m.end) T(value);
- m.end = new_end;
- }
-
- _details::optional <T> pop_back() noexcept {
- if (m.start == m.end) return _details::nullopt;
- auto last = --end();
- _details::optional <T> ret = std::move(*last);
- (*last).~T();
- m.end = (m.end - 1) bitand (m.capacity - 1);
- return ret;
- }
-
- bool empty() const {
- return m.start == m.end;
- }
-
- ~circular_buffer() {
- while (not empty()) {
- pop_front();
- }
- }
- };
- }
-
- namespace _details {
- template<typename T>
- class channel_impl {
- mutable std::mutex lock;
- container::circular_buffer<T> data;
- int active_writer = 0;
- int active_reader = 0;
- public:
- struct reader_t{};
- struct writer_t{};
- static constexpr reader_t reader{};
- static constexpr writer_t writer{};
- void close(reader_t) {
- std::lock_guard<std::mutex> d{lock};
- active_reader -= 1;
- }
- void close(writer_t) {
- std::lock_guard<std::mutex> d{lock};
- active_writer -=1;
- }
- void register_one(reader_t, _details::_do_not_use_t) {
- std::lock_guard<std::mutex> d{lock};
- active_reader += 1;
- }
- void register_one(writer_t, _details::_do_not_use_t) {
- std::lock_guard<std::mutex> d{lock};
- active_writer += 1;
- }
- bool closed(reader_t) {
- std::lock_guard<std::mutex> d{lock};
- return active_reader == 0;
- }
- bool closed(writer_t) {
- std::lock_guard<std::mutex> d{lock};
- return active_writer == 0;
- }
- bool push(const T& elem) {
- std::lock_guard<std::mutex> d{lock};
- if(active_reader == 0) return true;
- if(active_writer == 0) return false;
- data.push_front(elem);
- return true;
- }
- bool push(T&& elem) {
- std::lock_guard<std::mutex> d{lock};
- if(active_reader == 0) return true;
- if(active_writer == 0) return false;
- data.push_front(elem);
- return true;
- }
- _details::optional<T> pop() {
- std::lock_guard<std::mutex> d{lock};
- if(active_reader == 0) return _details::nullopt;
- return data.pop_back();
- }
- bool empty() const {
- return active_writer == 0 && data.empty();
- }
- };
- }
-
- /**
- * This is a read only channel of its specified template parameter type
- * @tparam T The type of element that is transmitted through the channel
- */
- template<typename T>
- class channel_r {
- std::shared_ptr<_details::channel_impl<T>> impl;
- public:
- /**
- * This constructor should **NOT** be used
- */
- channel_r(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl<T>> _impl)
- : impl(_impl) {
- impl->register_one(_details::channel_impl<T>::reader, _details::do_not_use);
- }
-
- /**
- * Copy constructor for channel receiver
- */
- channel_r(const channel_r& oth) {
- if(oth.impl.get() == impl.get()) return;
- impl = oth.impl;
- if(impl) impl->register_one(_details::channel_impl<T>::reader, _details::do_not_use);
- }
-
- /**
- * Obtains a value from the queue
- * @return either a value if one is available in the queue, or no value
- */
- _details::optional<T> pop() {
- return impl->pop();
- }
-
- /**
- * Verifies if the queue is empty and has no sender available
- * @return true if the queue will never contain new elements
- * @return false if the queue may at some point in the future contain new elements
- */
- bool empty() const {
- return impl->empty();
- }
-
- ~channel_r() {
- impl->close(_details::channel_impl<T>::reader);
- }
-
- /**
- * Verifies if the writer side is still open
- * @return true if the writer side is closed
- * @return false if the writer side is still open
- */
- bool closed() {
- return impl->closed(_details::channel_impl<T>::writer);
- }
- };
-
- /**
- * This is a write only channel of its specified template parameter type
- * @tparam T The type of element that is transmitted through the channel
- */
- template<typename T>
- class channel_w {
- std::shared_ptr<_details::channel_impl<T>> impl;
- public:
- /**
- * This constructor should **NOT** be used
- */
- channel_w(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl<T>> _impl)
- : impl(_impl) {
- impl->register_one(_details::channel_impl<T>::writer, _details::do_not_use);
- }
-
- /**
- * Copy constructor for channel sender
- */
- channel_w(const channel_w& oth) {
- if(oth.impl.get() == impl.get()) return;
- impl = oth.impl;
- if(impl) impl->register_one(_details::channel_impl<T>::writer, _details::do_not_use);
- }
-
- /**
- * Sends an element through the channel
- * @param elem the element to send
- * @return true is the element was successfully pushed
- */
- bool push(const T& elem) {
- return impl->push(elem);
- }
- /**
- * Sends an element through the channel
- * @param elem the element to send
- * @return true is the element was successfully pushed
- */
- bool push(T&& elem) {
- return impl->push(std::forward<T>(elem));
- }
- ~channel_w() {
- impl->close(_details::channel_impl<T>::writer);
- }
-
- /**
- * Verifies if the reader side is still open
- * @return true if the reader side is closed
- * @return false if the reader side is still open
- */
- bool closed() {
- return impl->closed(_details::channel_impl<T>::reader);
- }
- };
-
- /**
- * Creates a pipe made of a reader channel and a writer channel allowing to transmit objects of the specified type between units of concurrency
- * @tparam T The type to allow transfers of
- * @return a pair `[writer, reader]` that are connected
- */
- template<typename T>
- std::pair<channel_w<T>, channel_r<T>> make_pipe() {
- auto shared_ch = std::make_shared<_details::channel_impl<T>>();
- return {{_details::do_not_use, shared_ch}, {_details::do_not_use, shared_ch}};
- }
-
- template<typename T, typename awaiter = _details::awaiter>
- class promise;
-
- /**
- * This class allows instantiation and manipulation of the receiving side of a promise
- * @tparam T The type that will be received
- * @tparam awaiter A type that has an `operator()(int)` that represent how this object must wait
- */
- template<typename T, typename awaiter = _details::awaiter>
- class future {
- channel_r<T> conduit;
- friend class promise<T, awaiter>;
-
- future(channel_r<T>&& c) : conduit(c) {};
-
- public:
- /**
- * Waits for the future to be set
- */
- void wait() {
- for(;;) {
- if (conduit.closed()) return;
- awaiter{}(30);
- }
- }
-
- /**
- * Obtains the received value.
- *
- * @note This method should never be called before wait, and should not be called more than once
- * @return
- */
- _details::optional<T> get() {
- return conduit.pop();
- }
- };
-
- /**
- * This class allows to represent waiting for a value from one unit of concurrency into another one.
- * @tparam T The type of the value that will be transmitted
- * @tparam awaiter A type that has an `operator()(int)` that represent how `future`s created by this object must wait
- */
- template<typename T, typename awaiter>
- class promise {
- _details::optional<channel_w<T>> conduit;
- _details::optional<channel_r<T>> future_conduit;
- public:
- /**
- * Initializes a valid promise
- */
- promise() {
- auto pipe = make_pipe<T>();
- conduit = std::move(pipe.first);
- future_conduit = std::move(pipe.second);
- }
-
- /**
- * Obtains the receiving end of this promise
- *
- * @note This method should never be called more than once
- * @return A future that can be awaited from
- */
- future<T, awaiter> get_future() {
- defer(cleanup, future_conduit = _details::nullopt);
- return future<T, awaiter> {
- std::move(future_conduit.value())
- };
- }
-
- /**
- * Sets the value of the future to the provided value
- *
- * @note This method and methods sharing the same name should never be called more than once
- * @param value The value to transmit
- */
- void set_value(T&& value) {
- conduit.value().push(std::forward<T>(value));
- conduit = _details::nullopt;
- }
-
- /**
- * Sets the value of the future to the provided value
- *
- * @note This method and methods sharing the same name should never be called more than once
- * @param value The value to transmit
- */
- void set_value(const T& value) {
- conduit.value().push(value);
- conduit = _details::nullopt;
- }
- };
-
- #undef defer
|