|
#pragma once
|
|
|
|
#include <cstddef>
|
|
#include <atomic>
|
|
#include <new>
|
|
#include <cassert>
|
|
#include <optional>
|
|
#include <iostream>
|
|
#include <any>
|
|
|
|
#include "sl/strategies.h"
|
|
|
|
#ifdef __cpp_lib_hardware_interference_size
|
|
static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
|
|
static constexpr size_t line_length = std::hardware_constructive_interference_size;
|
|
#else
|
|
static constexpr size_t max_interference_size = 128;
|
|
static constexpr size_t line_length = 64;
|
|
#endif
|
|
|
|
template<typename T>
|
|
struct alignas(max_interference_size) padded_atomic final {
|
|
std::atomic<T> value;
|
|
};
|
|
|
|
using offset_t = size_t;
|
|
extern const size_t page_size;
|
|
|
|
struct force_contiguous_mode_t {};
|
|
constexpr force_contiguous_mode_t force_contiguous_mode;
|
|
struct token_t {offset_t start; offset_t end;};
|
|
|
|
struct disruptor_exception : public std::runtime_error {
|
|
disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
|
|
explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
|
|
};
|
|
|
|
template<typename OverflowStrategyType>
|
|
#ifdef __cpp_concepts
|
|
requires OverflowStrategy<OverflowStrategyType>
|
|
#endif
|
|
class disruptor
|
|
{
|
|
char* buffer;
|
|
size_t buffer_size;
|
|
std::atomic<offset_t>* read_trailer;
|
|
std::atomic<offset_t>* read_lead;
|
|
std::atomic<offset_t>* write_trailer;
|
|
std::atomic<offset_t>* write_lead;
|
|
|
|
offset_t* max_offset;
|
|
|
|
char* data_buffer;
|
|
std::any buffer_life_extender;
|
|
std::any strategy_life_extender;
|
|
public:
|
|
/**
|
|
* Constructs a disruptor from the provided strategy, initialization_and_checks() is responsible for the initialization
|
|
* @param strategy The provided strategy. The provided strategy is preserved, intact if properly movable and if moving is "pointer stable". The obtained buffer is also preserved under the same supposition.
|
|
* @param _buffer_size The total size of the buffer to allocate, keep in mind this includes not only the
|
|
*/
|
|
template<BufferStrategy buffer_strategy>
|
|
disruptor(buffer_strategy strategy, std::string_view _buffer_filename, const size_t _buffer_size)
|
|
: read_trailer{}
|
|
, read_lead{}
|
|
, write_trailer{}
|
|
, write_lead{}
|
|
{
|
|
auto tmp = strategy.build_buffer(_buffer_filename, _buffer_size);
|
|
buffer = tmp.data();
|
|
buffer_size = tmp.size();
|
|
read_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*0);
|
|
read_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*1);
|
|
write_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*2);
|
|
write_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*3);
|
|
max_offset = reinterpret_cast<offset_t*> (buffer+max_interference_size*4);
|
|
buffer_life_extender = std::move(tmp);
|
|
strategy_life_extender = std::move(strategy);
|
|
initialization_and_checks();
|
|
}
|
|
|
|
/**
|
|
* Constructs a disruptor from the provided memory span, initialization_and_checks() is responsible for the initialization
|
|
* @ref initialization_and_checks()
|
|
* @param _buffer
|
|
* @param _buffer_size
|
|
*/
|
|
disruptor(char* _buffer, const size_t _buffer_size)
|
|
: buffer(_buffer)
|
|
, buffer_size(_buffer_size)
|
|
, read_trailer (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
|
|
, read_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
|
|
, write_trailer(reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
|
|
, write_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
|
|
, max_offset (reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
|
|
{
|
|
initialization_and_checks();
|
|
}
|
|
|
|
void initialization_and_checks() {
|
|
if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
|
|
data_buffer = buffer+max_interference_size*5;
|
|
if(data_buffer <= buffer+page_size) {
|
|
data_buffer = buffer+page_size;
|
|
*max_offset = buffer_size - page_size;
|
|
} else if(data_buffer > buffer+page_size) {
|
|
size_t ctr = 0;
|
|
do {
|
|
ctr++;
|
|
} while(data_buffer > buffer+page_size*ctr);
|
|
data_buffer = buffer+page_size*ctr;
|
|
*max_offset = buffer_size - page_size*ctr;
|
|
}
|
|
}
|
|
|
|
template<bool must_be_contiguous = false>
|
|
std::optional<token_t> try_advance(
|
|
std::atomic<offset_t>& lead,
|
|
std::atomic<offset_t>& fence,
|
|
offset_t amount
|
|
) {
|
|
offset_t old_offset = lead.load(std::memory_order_seq_cst);
|
|
offset_t new_offset = old_offset + amount;
|
|
offset_t fence_v = fence.load(std::memory_order_seq_cst);
|
|
|
|
// Check if we jumped the fence
|
|
if(fence_v <= new_offset && fence_v > old_offset) {
|
|
goto handle_fence;
|
|
}
|
|
|
|
// Check if we jumped the fence while overflowing
|
|
if(new_offset >= *max_offset) {
|
|
if constexpr(must_be_contiguous) {
|
|
new_offset = amount;
|
|
} else {
|
|
new_offset %= *max_offset;
|
|
}
|
|
if(fence_v <= new_offset) {
|
|
goto handle_fence;
|
|
}
|
|
}
|
|
|
|
goto handle_fence_end;
|
|
handle_fence:
|
|
if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
|
|
return std::nullopt;
|
|
} else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
|
|
if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_seq_cst)) {
|
|
return std::nullopt;
|
|
} else {
|
|
offset_t v;
|
|
do {
|
|
v = read_lead->load(std::memory_order_seq_cst);
|
|
if(v >= new_offset) break;
|
|
} while(read_lead->compare_exchange_weak(v, new_offset, std::memory_order_seq_cst));
|
|
}
|
|
} else {
|
|
static_assert(
|
|
OverflowStrategyType::on_overflow == overflow_response_t::must_wait
|
|
|| OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
|
|
);
|
|
}
|
|
handle_fence_end:
|
|
|
|
if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
|
|
return std::nullopt;
|
|
}
|
|
|
|
return token_t{old_offset, new_offset};
|
|
}
|
|
|
|
token_t reserve_write(size_t sz) {
|
|
std::optional<token_t> tok;
|
|
OverflowStrategyType waiter;
|
|
while(true) {
|
|
tok = try_advance(*write_lead, *read_trailer, sz);
|
|
if(tok) break;
|
|
waiter.wait();
|
|
}
|
|
// std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
|
|
return tok.value();
|
|
}
|
|
token_t reserve_write(size_t sz, force_contiguous_mode_t) {
|
|
std::optional<token_t> tok;
|
|
OverflowStrategyType waiter;
|
|
while(true) {
|
|
tok = try_advance<true>(*write_lead, *read_trailer, sz);
|
|
if(tok) break;
|
|
waiter.wait();
|
|
}
|
|
// std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
|
|
return tok.value();
|
|
}
|
|
|
|
static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
|
|
|
|
void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
|
|
OverflowStrategyType waiter;
|
|
while(!write_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
|
|
waiter.wait();
|
|
}
|
|
}
|
|
|
|
token_t reserve_read() {
|
|
offset_t old_offset = read_lead->load(std::memory_order_seq_cst);
|
|
offset_t new_offset = write_trailer->load(std::memory_order_seq_cst);
|
|
if(old_offset > new_offset) new_offset = 0;
|
|
OverflowStrategyType waiter;
|
|
|
|
while(!read_lead->compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
|
|
waiter.wait();
|
|
}
|
|
// std::cout << old_offset << " rr " << new_offset << std::endl;
|
|
return token_t{old_offset, new_offset};
|
|
}
|
|
void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
|
|
OverflowStrategyType waiter;
|
|
while(!read_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
|
|
waiter.wait();
|
|
}
|
|
// std::cout << tok.start << " cr " << tok.end << std::endl;
|
|
}
|
|
|
|
char& operator[](size_t offset) {
|
|
return data_buffer[offset];
|
|
}
|
|
|
|
char* data() {
|
|
return data_buffer;
|
|
}
|
|
|
|
[[nodiscard]] offset_t size() const {
|
|
return *max_offset;
|
|
}
|
|
};
|
|
|
|
|
|
class write_span {
|
|
offset_t start, end;
|
|
size_t target_sz;
|
|
char* target_buffer;
|
|
public:
|
|
template<typename of_strat>
|
|
write_span(token_t tok, disruptor<of_strat>& _target)
|
|
: start(tok.start)
|
|
, end(tok.end)
|
|
, target_sz(_target.size())
|
|
, target_buffer(_target.data())
|
|
{}
|
|
private:
|
|
write_span(const write_span& src, size_t ltrim)
|
|
: start((src.start + ltrim) % src.target_sz)
|
|
, end(src.end)
|
|
, target_sz(src.target_sz)
|
|
, target_buffer(src.target_buffer)
|
|
{}
|
|
public:
|
|
|
|
char& front() {
|
|
return target_buffer[start];
|
|
}
|
|
|
|
[[nodiscard]] size_t size() const {
|
|
return start <= end ? end - start : end + target_sz - start;
|
|
}
|
|
|
|
write_span subspan(size_t ltrim) {
|
|
if(ltrim > size()) throw disruptor_exception("write_span overflow, ltrim greater than available span");
|
|
|
|
return write_span(*this, ltrim);
|
|
}
|
|
|
|
[[nodiscard]] bool empty() const {
|
|
return end==start;
|
|
}
|
|
};
|