Browse Source

Add 'channels.hpp'

master
Archivist 6 months ago
parent
commit
3679905baa
1 changed files with 534 additions and 0 deletions
  1. +534
    -0
      channels.hpp

+ 534
- 0
channels.hpp View File

@ -0,0 +1,534 @@
#pragma once
#include <mutex>
#include <algorithm>
#include <functional>
#include <memory>
#include <thread>
#include <optional>
namespace _details {
template<typename T>
using optional = std::optional<T>; //< Configuration constant on which optional to use to implement the rest of this file
auto nullopt = std::nullopt; //< Configuration constant on which optional to use to implement the rest of this file
struct awaiter {
void operator()(int duration) {
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
}
};
struct defer_impl {
std::function<void(void)> fn;
~defer_impl() {
fn();
}
};
#ifdef defer
#warning "A defer macro is already defined and will be undefined after this file"
#undef defer
#endif
#define defer(name, h) _details::defer_impl _defer_slice ## name {[&](){(h);}};
struct _do_not_use_t {
};
constexpr _do_not_use_t do_not_use{};
}
namespace container {
template<typename T>
class circular_buffer_iterator {
struct M {
T *buffer;
size_t capacity;
size_t index;
};
M m;
explicit circular_buffer_iterator(M members)
: m{members} {}
public:
using difference_type = size_t;
using value_type = T;
using pointer = T *;
using reference = T &;
using iterator_category = std::bidirectional_iterator_tag;
circular_buffer_iterator(T *buffer, size_t capacity, size_t index)
: m{M{
.buffer = buffer,
.capacity = capacity,
.index = index
}} {}
circular_buffer_iterator &operator++() {
m.index = (m.index + 1) bitand (m.capacity - 1);
return *this;
}
circular_buffer_iterator operator++(int) {
M oth = m;
m.index = (m.index + 1) bitand (m.capacity - 1);
return circular_buffer_iterator{oth};
}
circular_buffer_iterator &operator--() {
m.index = (m.index - 1) bitand (m.capacity - 1);
return *this;
}
circular_buffer_iterator operator--(int) {
M oth = m;
m.index = (m.index - 1) bitand (m.capacity - 1);
return circular_buffer_iterator{oth};
}
T &operator*() {
return m.buffer[m.index];
}
bool operator==(const circular_buffer_iterator &oth) {
return m == oth.m;
}
bool operator!=(const circular_buffer_iterator &oth) {
return m.buffer != oth.m.buffer or m.index != oth.m.index or m.capacity != oth.m.capacity;
}
};
template<typename T>
class circular_buffer {
struct M {
std::unique_ptr<char[]> buffer;
size_t capacity;
size_t start;
size_t end;
};
M m;
M null_members() {
return M{
.buffer = nullptr,
.capacity = 0,
.start = 0,
.end = 0
};
}
static size_t validate_capacity(size_t new_capacity) {
if (new_capacity < 2) return 2;
if ((new_capacity bitand (new_capacity - 1))) {
new_capacity = new_capacity & (new_capacity >> 1);
new_capacity = new_capacity & (new_capacity >> 2);
new_capacity = new_capacity & (new_capacity >> 4);
new_capacity = new_capacity & (new_capacity >> 8);
new_capacity = new_capacity & (new_capacity >> 16);
new_capacity = new_capacity & (new_capacity >> 31);
return new_capacity + 1;
}
return new_capacity;
}
explicit circular_buffer(M s) : m(std::move(s)) {}
public:
circular_buffer()
: m{null_members()} {}
circular_buffer(const circular_buffer &oth)
: m(null_members()) {
if (oth.m.capacity == 0) return;
reserve(oth.m.capacity);
for (const auto &elem: oth) {
push_back(elem);
}
}
circular_buffer(circular_buffer &&oth) noexcept
: m{std::exchange(oth.m, null_members())} {}
circular_buffer &operator=(const circular_buffer &oth) {
if (this == &oth) return *this;
while (pop_front());
reserve(oth.m.capacity);
for (const auto &elem: oth) {
push_back(elem);
}
};
circular_buffer &operator=(circular_buffer &&oth) noexcept {
std::swap(m, oth.m);
}
circular_buffer_iterator<T> begin() {
return circular_buffer_iterator<T>(reinterpret_cast<T *>(m.buffer.get()), m.capacity, m.start);
}
circular_buffer_iterator<T> end() {
return circular_buffer_iterator<T>(reinterpret_cast<T *>(m.buffer.get()), m.capacity, m.end);
}
circular_buffer_iterator<const T> begin() const {
return circular_buffer_iterator<const T>{
m.buffer.get(), m.capacity, m.start
};
}
circular_buffer_iterator<const T> end() const {
return circular_buffer_iterator<const T>{
m.buffer.get(), m.capacity, m.end
};
}
void reserve(size_t new_capacity) {
new_capacity = validate_capacity(new_capacity);
if (new_capacity <= m.capacity) return;
circular_buffer temp{M{
.buffer = std::make_unique<char[]>(new_capacity * sizeof(T)),
.capacity = new_capacity,
.start = 0,
.end = 0
}};
_details::optional <T> value = pop_front();
while (value) {
temp.push_back(std::move(value.value()));
value = pop_front();
}
std::swap(m, temp.m);
}
void push_front(T value) {
if (m.capacity == 0) reserve(2);
size_t new_start = (m.start - 1) bitand (m.capacity - 1);
if (new_start == m.end) {
reserve(m.capacity << 1);
}
new_start = (m.start - 1) bitand (m.capacity - 1);
::new(reinterpret_cast<T *>(m.buffer.get()) + new_start) T(value);
m.start = new_start;
}
_details::optional <T> pop_front() noexcept {
if (m.start == m.end) return _details::nullopt;
_details::optional <T> ret = std::move(*begin());
(*begin()).~T();
m.start = (m.start + 1) bitand (m.capacity - 1);
return ret;
}
void push_back(T value) {
if (m.capacity == 0) reserve(2);
size_t new_end = (m.end + 1) bitand (m.capacity - 1);
if (new_end == m.start) {
reserve(m.capacity << 1);
}
new_end = (m.end + 1) bitand (m.capacity - 1);
::new(reinterpret_cast<T *>(m.buffer.get()) + m.end) T(value);
m.end = new_end;
}
_details::optional <T> pop_back() noexcept {
if (m.start == m.end) return _details::nullopt;
auto last = --end();
_details::optional <T> ret = std::move(*last);
(*last).~T();
m.end = (m.end - 1) bitand (m.capacity - 1);
return ret;
}
bool empty() const {
return m.start == m.end;
}
~circular_buffer() {
while (not empty()) {
pop_front();
}
}
};
}
namespace _details {
template<typename T>
class channel_impl {
mutable std::mutex lock;
container::circular_buffer<T> data;
int active_writer = 0;
int active_reader = 0;
public:
struct reader_t{};
struct writer_t{};
static constexpr reader_t reader{};
static constexpr writer_t writer{};
void close(reader_t) {
std::lock_guard<std::mutex> d{lock};
active_reader -= 1;
}
void close(writer_t) {
std::lock_guard<std::mutex> d{lock};
active_writer -=1;
}
void register_one(reader_t, _details::_do_not_use_t) {
std::lock_guard<std::mutex> d{lock};
active_reader += 1;
}
void register_one(writer_t, _details::_do_not_use_t) {
std::lock_guard<std::mutex> d{lock};
active_writer += 1;
}
bool closed(reader_t) {
std::lock_guard<std::mutex> d{lock};
return active_reader == 0;
}
bool closed(writer_t) {
std::lock_guard<std::mutex> d{lock};
return active_writer == 0;
}
bool push(const T& elem) {
std::lock_guard<std::mutex> d{lock};
if(active_reader == 0) return true;
if(active_writer == 0) return false;
data.push_front(elem);
return true;
}
bool push(T&& elem) {
std::lock_guard<std::mutex> d{lock};
if(active_reader == 0) return true;
if(active_writer == 0) return false;
data.push_front(elem);
return true;
}
_details::optional<T> pop() {
std::lock_guard<std::mutex> d{lock};
if(active_reader == 0) return _details::nullopt;
return data.pop_back();
}
bool empty() const {
return active_writer == 0 && data.empty();
}
};
}
/**
* This is a read only channel of its specified template parameter type
* @tparam T The type of element that is transmitted through the channel
*/
template<typename T>
class channel_r {
std::shared_ptr<_details::channel_impl<T>> impl;
public:
/**
* This constructor should **NOT** be used
*/
channel_r(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl<T>> _impl)
: impl(_impl) {
impl->register_one(_details::channel_impl<T>::reader, _details::do_not_use);
}
/**
* Copy constructor for channel receiver
*/
channel_r(const channel_r& oth) {
if(oth.impl.get() == impl.get()) return;
impl = oth.impl;
if(impl) impl->register_one(_details::channel_impl<T>::reader, _details::do_not_use);
}
/**
* Obtains a value from the queue
* @return either a value if one is available in the queue, or no value
*/
_details::optional<T> pop() {
return impl->pop();
}
/**
* Verifies if the queue is empty and has no sender available
* @return true if the queue will never contain new elements
* @return false if the queue may at some point in the future contain new elements
*/
bool empty() const {
return impl->empty();
}
~channel_r() {
impl->close(_details::channel_impl<T>::reader);
}
/**
* Verifies if the writer side is still open
* @return true if the writer side is closed
* @return false if the writer side is still open
*/
bool closed() {
return impl->closed(_details::channel_impl<T>::writer);
}
};
/**
* This is a write only channel of its specified template parameter type
* @tparam T The type of element that is transmitted through the channel
*/
template<typename T>
class channel_w {
std::shared_ptr<_details::channel_impl<T>> impl;
public:
/**
* This constructor should **NOT** be used
*/
channel_w(_details::_do_not_use_t, std::shared_ptr<_details::channel_impl<T>> _impl)
: impl(_impl) {
impl->register_one(_details::channel_impl<T>::writer, _details::do_not_use);
}
/**
* Copy constructor for channel sender
*/
channel_w(const channel_w& oth) {
if(oth.impl.get() == impl.get()) return;
impl = oth.impl;
if(impl) impl->register_one(_details::channel_impl<T>::writer, _details::do_not_use);
}
/**
* Sends an element through the channel
* @param elem the element to send
* @return true is the element was successfully pushed
*/
bool push(const T& elem) {
return impl->push(elem);
}
/**
* Sends an element through the channel
* @param elem the element to send
* @return true is the element was successfully pushed
*/
bool push(T&& elem) {
return impl->push(std::forward<T>(elem));
}
~channel_w() {
impl->close(_details::channel_impl<T>::writer);
}
/**
* Verifies if the reader side is still open
* @return true if the reader side is closed
* @return false if the reader side is still open
*/
bool closed() {
return impl->closed(_details::channel_impl<T>::reader);
}
};
/**
* Creates a pipe made of a reader channel and a writer channel allowing to transmit objects of the specified type between units of concurrency
* @tparam T The type to allow transfers of
* @return a pair `[writer, reader]` that are connected
*/
template<typename T>
std::pair<channel_w<T>, channel_r<T>> make_pipe() {
auto shared_ch = std::make_shared<_details::channel_impl<T>>();
return {{_details::do_not_use, shared_ch}, {_details::do_not_use, shared_ch}};
}
template<typename T, typename awaiter = _details::awaiter>
class promise;
/**
* This class allows instantiation and manipulation of the receiving side of a promise
* @tparam T The type that will be received
* @tparam awaiter A type that has an `operator()(int)` that represent how this object must wait
*/
template<typename T, typename awaiter = _details::awaiter>
class future {
channel_r<T> conduit;
friend class promise<T, awaiter>;
future(channel_r<T>&& c) : conduit(c) {};
public:
/**
* Waits for the future to be set
*/
void wait() {
for(;;) {
if (conduit.closed()) return;
awaiter{}(30);
}
}
/**
* Obtains the received value.
*
* @note This method should never be called before wait, and should not be called more than once
* @return
*/
_details::optional<T> get() {
return conduit.pop();
}
};
/**
* This class allows to represent waiting for a value from one unit of concurrency into another one.
* @tparam T The type of the value that will be transmitted
* @tparam awaiter A type that has an `operator()(int)` that represent how `future`s created by this object must wait
*/
template<typename T, typename awaiter>
class promise {
_details::optional<channel_w<T>> conduit;
_details::optional<channel_r<T>> future_conduit;
public:
/**
* Initializes a valid promise
*/
promise() {
auto pipe = make_pipe<T>();
conduit = std::move(pipe.first);
future_conduit = std::move(pipe.second);
}
/**
* Obtains the receiving end of this promise
*
* @note This method should never be called more than once
* @return A future that can be awaited from
*/
future<T, awaiter> get_future() {
defer(cleanup, future_conduit = _details::nullopt);
return future<T, awaiter> {
std::move(future_conduit.value())
};
}
/**
* Sets the value of the future to the provided value
*
* @note This method and methods sharing the same name should never be called more than once
* @param value The value to transmit
*/
void set_value(T&& value) {
conduit.value().push(std::forward<T>(value));
conduit = _details::nullopt;
}
/**
* Sets the value of the future to the provided value
*
* @note This method and methods sharing the same name should never be called more than once
* @param value The value to transmit
*/
void set_value(const T& value) {
conduit.value().push(value);
conduit = _details::nullopt;
}
};
#undef defer

Loading…
Cancel
Save