diff --git a/include/gp/containers/vector.hpp b/include/gp/containers/vector.hpp index 9d5ea85..510d830 100644 --- a/include/gp/containers/vector.hpp +++ b/include/gp/containers/vector.hpp @@ -236,6 +236,14 @@ namespace gp{ return gp::move(ret_val); } + void remove(pointer_iterator it) { + for(auto step = it + 1; step begin() { return associated_iterator(&ary[0]); diff --git a/include/gp/ipc/channel.hpp b/include/gp/ipc/channel.hpp new file mode 100644 index 0000000..f5da029 --- /dev/null +++ b/include/gp/ipc/channel.hpp @@ -0,0 +1,85 @@ +#pragma once + +#include "gp/containers/vector.hpp" +#include "gp/functional/optional.hpp" +#include "gp/ipc/bottleneck.hpp" +#include "gp/utils/allocators/allocator.hpp" +#include "gp/utils/allocators/buddy.hpp" +#include "gp/utils/pointers.hpp" + +namespace gp { + /** + * @brief + */ + class channel { + allocator& self_allocator; //< This is from whatever created the channel and MUST outlive the channel + gp::unique_ptr local_allocator_impl; //< The allocator used here + gp::vector> data; + gp::fast_bottleneck lock; + + public: + /** + * @brief Construct a new channel object + * + * @param memory_source Where do we allocate from for this channel + * @param memory_available The amount of memory to allocate, default specified in the configuration + */ + channel(allocator& memory_source, size_t memory_available = gp_config::limits::channel_default_size) + : self_allocator(memory_source) + , local_allocator_impl( + gp::unique_ptr> + ::make(self_allocator, memory_available, self_allocator) + .cast() + ) + , data(*local_allocator_impl) + {} + + /** + * @brief Sends a message on the channel + * + * @param message The message to transmit through the channel, this implies two copies. Empty messages are invalid and hence not transmitted. + * @return true if the message was successfully sent + * @return false if the message was not sent + */ + bool try_send(const gp::buffer message) { + if(message.size() == 0) return false; + auto guard = lock_guard(lock); + auto v = gp::vector(*local_allocator_impl); + if(not v.reserve(message.size())) return false; + for(char c : message) { + v.push_back(c); + } + if(not data.push_back(v)) return false; + if((*data.rbegin()).size() == 0) return false; + return true; + } + + /** + * @brief Tries to receive a message that matches a certain predicate + * + * @tparam predicate represents the predicate type, you will probably never specify that by hand + * @param allocate The allocator to allocate the response from + * @param pred The predicate + * @return gp::optional> if the receive failed, this will be empty, else it will contain the message + */ + template + gp::optional> try_receive( + allocator allocate, + predicate pred = [](gp::buffer){return true;} + ) { + for(auto it = data.begin(); it != data.end(); ++it) { + auto& elem = *it; + if(predicate(elem.as_buffer())) { + auto v = gp::vector(allocate); + if(not v.reserve(elem.size())) return nullopt; + for(char c : elem) { + v.push_back(c); + } + data.remove(it); + return gp::move(v); + } + } + return nullopt; + } + }; +} diff --git a/include/gp/utils/pointers.hpp b/include/gp/utils/pointers.hpp index f3f81e1..4b050e3 100644 --- a/include/gp/utils/pointers.hpp +++ b/include/gp/utils/pointers.hpp @@ -32,6 +32,13 @@ namespace gp { return unique_ptr(new(ptr) T(gp::forward(args)...), owner); } + template + unique_ptr cast() { + auto save = data; + data = nullptr; + return unique_ptr(safe, owner); + } + T* operator->() { return data; } @@ -80,6 +87,14 @@ namespace gp { refcounter->store(1); } + shared_ptr(T* _data, std::atomic_int* refctr, gp::allocator& _owner) + : data(_data) + , refcounter(refctr) + , owner(_owner) + { + refcounter->store(1); + } + void dirty_clear() { if(!refcounter) return; if(refcounter->fetch_sub(1, std::memory_order::acq_rel) == 0) { @@ -97,6 +112,11 @@ namespace gp { return shared_ptr(new(ptr) T(gp::forward(args)...), owner); } + template + shared_ptr cast() { + return shared_ptr(data, refcounter, owner); + } + T* operator->() { return data; } diff --git a/include/gp_config.hpp b/include/gp_config.hpp index d850838..e04f64f 100644 --- a/include/gp_config.hpp +++ b/include/gp_config.hpp @@ -53,6 +53,16 @@ namespace gp_config{ */ constexpr size_t max_processes = 4096; + /** + * @brief the maximum size a channel can address + */ + constexpr size_t channel_max_size = 1 << 20; + + /** + * @brief the default size a channel will take + */ + constexpr size_t channel_default_size = 1 << 16; + /** * @brief the number of open files each process is allowed to have */