Browse Source

A prototype for byte channel primitives

channel
Ludovic 'Archivist' Lagouardette 3 years ago
parent
commit
52c260e376
4 changed files with 123 additions and 0 deletions
  1. +8
    -0
      include/gp/containers/vector.hpp
  2. +85
    -0
      include/gp/ipc/channel.hpp
  3. +20
    -0
      include/gp/utils/pointers.hpp
  4. +10
    -0
      include/gp_config.hpp

+ 8
- 0
include/gp/containers/vector.hpp View File

@ -236,6 +236,14 @@ namespace gp{
return gp::move(ret_val);
}
void remove(pointer_iterator<T, 1> it) {
for(auto step = it + 1; step<end(); step++) {
(*it++) = gp::move(*step);
}
*rbegin().~T();
sz -= 1;
}
constexpr pointer_iterator<T, 1> begin()
{
return associated_iterator(&ary[0]);

+ 85
- 0
include/gp/ipc/channel.hpp View File

@ -0,0 +1,85 @@
#pragma once
#include "gp/containers/vector.hpp"
#include "gp/functional/optional.hpp"
#include "gp/ipc/bottleneck.hpp"
#include "gp/utils/allocators/allocator.hpp"
#include "gp/utils/allocators/buddy.hpp"
#include "gp/utils/pointers.hpp"
namespace gp {
/**
* @brief
*/
class channel {
allocator& self_allocator; //< This is from whatever created the channel and MUST outlive the channel
gp::unique_ptr<allocator> local_allocator_impl; //< The allocator used here
gp::vector<gp::vector<char>> data;
gp::fast_bottleneck lock;
public:
/**
* @brief Construct a new channel object
*
* @param memory_source Where do we allocate from for this channel
* @param memory_available The amount of memory to allocate, default specified in the configuration
*/
channel(allocator& memory_source, size_t memory_available = gp_config::limits::channel_default_size)
: self_allocator(memory_source)
, local_allocator_impl(
gp::unique_ptr<gp::buddy<gp_config::limits::channel_max_size/16,16>>
::make(self_allocator, memory_available, self_allocator)
.cast<allocator>()
)
, data(*local_allocator_impl)
{}
/**
* @brief Sends a message on the channel
*
* @param message The message to transmit through the channel, this implies two copies. Empty messages are invalid and hence not transmitted.
* @return true if the message was successfully sent
* @return false if the message was not sent
*/
bool try_send(const gp::buffer<char> message) {
if(message.size() == 0) return false;
auto guard = lock_guard(lock);
auto v = gp::vector<char>(*local_allocator_impl);
if(not v.reserve(message.size())) return false;
for(char c : message) {
v.push_back(c);
}
if(not data.push_back(v)) return false;
if((*data.rbegin()).size() == 0) return false;
return true;
}
/**
* @brief Tries to receive a message that matches a certain predicate
*
* @tparam predicate represents the predicate type, you will probably never specify that by hand
* @param allocate The allocator to allocate the response from
* @param pred The predicate
* @return gp::optional<gp::vector<char>> if the receive failed, this will be empty, else it will contain the message
*/
template<typename predicate>
gp::optional<gp::vector<char>> try_receive(
allocator allocate,
predicate pred = [](gp::buffer<char>){return true;}
) {
for(auto it = data.begin(); it != data.end(); ++it) {
auto& elem = *it;
if(predicate(elem.as_buffer())) {
auto v = gp::vector<char>(allocate);
if(not v.reserve(elem.size())) return nullopt;
for(char c : elem) {
v.push_back(c);
}
data.remove(it);
return gp::move(v);
}
}
return nullopt;
}
};
}

+ 20
- 0
include/gp/utils/pointers.hpp View File

@ -32,6 +32,13 @@ namespace gp {
return unique_ptr(new(ptr) T(gp::forward<U>(args)...), owner);
}
template<typename U>
unique_ptr<U> cast() {
auto save = data;
data = nullptr;
return unique_ptr<U>(safe, owner);
}
T* operator->() {
return data;
}
@ -80,6 +87,14 @@ namespace gp {
refcounter->store(1);
}
shared_ptr(T* _data, std::atomic_int* refctr, gp::allocator& _owner)
: data(_data)
, refcounter(refctr)
, owner(_owner)
{
refcounter->store(1);
}
void dirty_clear() {
if(!refcounter) return;
if(refcounter->fetch_sub(1, std::memory_order::acq_rel) == 0) {
@ -97,6 +112,11 @@ namespace gp {
return shared_ptr(new(ptr) T(gp::forward<U>(args)...), owner);
}
template<typename U>
shared_ptr<U> cast() {
return shared_ptr<U>(data, refcounter, owner);
}
T* operator->() {
return data;
}

+ 10
- 0
include/gp_config.hpp View File

@ -53,6 +53,16 @@ namespace gp_config{
*/
constexpr size_t max_processes = 4096;
/**
* @brief the maximum size a channel can address
*/
constexpr size_t channel_max_size = 1 << 20;
/**
* @brief the default size a channel will take
*/
constexpr size_t channel_default_size = 1 << 16;
/**
* @brief the number of open files each process is allowed to have
*/

Loading…
Cancel
Save