Przeglądaj źródła

Added a disruptor implementation

main
Ludovic 'Archivist' Lagouardette 2 lat temu
rodzic
commit
5f32c464e8
2 zmienionych plików z 179 dodań i 0 usunięć
  1. +177
    -0
      LibSnugLog/include/disruptor.h
  2. +2
    -0
      LibSnugLog/source/disruptor.cpp

+ 177
- 0
LibSnugLog/include/disruptor.h Wyświetl plik

@ -1,2 +1,179 @@
#pragma once
#include <cstddef>
#include <atomic>
#include <new>
#include <cassert>
#include <optional>
#ifdef __cpp_lib_hardware_interference_size
static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
static constexpr size_t line_length = std::hardware_constructive_interference_size;
#else
static constexpr size_t max_interference_size = 128;
static constexpr size_t line_length = 64;
#endif
template<typename T>
struct alignas(max_interference_size) padded_atomic final {
std::atomic<T> value;
};
using offset_t = size_t;
extern const size_t page_size;
enum class overflow_response_t {
must_wait,
must_overflow
};
#ifdef __cpp_concepts
template<typename T>
concept OverflowStrategyType = requires (T strategy) {
{T::on_overflow} -> std::same_as<overflow_response_t>;
{strategy.wait()};
};
#endif
struct force_contiguous_mode {};
struct token_t {offset_t start; offset_t end;};
struct disruptor_exception : public std::runtime_error {
disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
};
template<typename OverflowStrategy>
#ifdef __cpp_concepts
requires OverflowStrategyType<OverflowStrategy>
#endif
class disruptor
{
char* buffer;
size_t buffer_size;
std::atomic<offset_t>& read_trailer;
std::atomic<offset_t>& read_lead;
std::atomic<offset_t>& write_trailer;
std::atomic<offset_t>& write_lead;
offset_t& max_offset;
char* data_buffer;
public:
disruptor(char* _buffer, const size_t _buffer_size)
: buffer(_buffer)
, buffer_size(_buffer_size)
, read_trailer (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
, read_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
, write_trailer(*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
, write_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
, max_offset (*reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
{
if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
data_buffer = buffer+max_interference_size*5;
if(data_buffer <= buffer+page_size) {
data_buffer = buffer+page_size;
max_offset = buffer_size - page_size;
} else if(data_buffer > buffer+page_size) {
size_t ctr = 0;
do {
ctr++;
} while(data_buffer > buffer+page_size*ctr);
data_buffer = buffer+page_size*ctr;
max_offset = buffer_size - page_size*ctr;
}
}
template<bool must_be_contiguous = false>
std::optional<token_t> try_advance(
std::atomic<offset_t>& lead,
std::atomic<offset_t> fence,
offset_t amount
) {
offset_t old_offset = lead.load(std::memory_order_relaxed);
offset_t new_offset = old_offset + amount;
offset_t fence_v = fence.load(std::memory_order_relaxed);
// Check if we jumped the fence
if(fence_v <= new_offset && fence_v > old_offset) {
return std::nullopt;
}
// Check if we jumped the fence while overflowing
if(new_offset >= max_offset) {
if constexpr(must_be_contiguous) {
new_offset = amount;
} else {
new_offset %= max_offset;
}
if(fence_v <= new_offset) {
return std::nullopt;
}
}
if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
return std::nullopt;
}
return token_t{old_offset, new_offset};
}
token_t reserve_write(size_t sz) {
std::optional<token_t> tok;
OverflowStrategy waiter;
while(true) {
tok = try_advance(write_lead, read_trailer, sz);
if(tok) break;
waiter.wait();
}
return tok.value();
}
token_t reserve_write(size_t sz, force_contiguous_mode) {
std::optional<token_t> tok;
OverflowStrategy waiter;
while(true) {
tok = try_advance<true>(write_lead, read_trailer, sz);
if(tok) break;
waiter.wait();
}
return tok.value();
}
void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<typename OverflowStrategy::wait>) {
OverflowStrategy waiter;
while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
}
token_t reserve_read() {
offset_t old_offset = read_lead.load(std::memory_order_relaxed);
offset_t new_offset = write_trailer.load(std::memory_order_relaxed);
if(old_offset > new_offset) new_offset = 0;
OverflowStrategy waiter;
while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
waiter.wait();
}
return token_t{old_offset, new_offset};
}
void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<typename OverflowStrategy::wait>) {
OverflowStrategy waiter;
while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
}
};
struct OverflowWait {
static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait;
void wait() {
#ifdef __clang__ or __GNUC__
__asm__("nop\n\t");
#elif _MSC_VER
__nop;
#endif
}
};

+ 2
- 0
LibSnugLog/source/disruptor.cpp Wyświetl plik

@ -1 +1,3 @@
#include "disruptor.h"
const size_t page_size = sysconf(_SC_PAGESIZE);

||||||
x
 
000:0
Ładowanie…
Anuluj
Zapisz