A C++ library for logging very fast and without allocating.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

205 lignes
6.3 KiB

#pragma once
#include <cstddef>
#include <atomic>
#include <new>
#include <cassert>
#include <optional>
#include <iostream>
#include "sl/strategies.h"
#ifdef __cpp_lib_hardware_interference_size
static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
static constexpr size_t line_length = std::hardware_constructive_interference_size;
#else
static constexpr size_t max_interference_size = 128;
static constexpr size_t line_length = 64;
#endif
template<typename T>
struct alignas(max_interference_size) padded_atomic final {
std::atomic<T> value;
};
using offset_t = size_t;
extern const size_t page_size;
struct force_contiguous_mode {};
struct token_t {offset_t start; offset_t end;};
struct disruptor_exception : public std::runtime_error {
disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
};
template<typename OverflowStrategyType>
#ifdef __cpp_concepts
requires OverflowStrategy<OverflowStrategyType>
#endif
class disruptor
{
char* buffer;
size_t buffer_size;
std::atomic<offset_t>& read_trailer;
std::atomic<offset_t>& read_lead;
std::atomic<offset_t>& write_trailer;
std::atomic<offset_t>& write_lead;
offset_t& max_offset;
char* data_buffer;
public:
disruptor(char* _buffer, const size_t _buffer_size)
: buffer(_buffer)
, buffer_size(_buffer_size)
, read_trailer (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
, read_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
, write_trailer(*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
, write_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
, max_offset (*reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
{
if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
data_buffer = buffer+max_interference_size*5;
if(data_buffer <= buffer+page_size) {
data_buffer = buffer+page_size;
max_offset = buffer_size - page_size;
} else if(data_buffer > buffer+page_size) {
size_t ctr = 0;
do {
ctr++;
} while(data_buffer > buffer+page_size*ctr);
data_buffer = buffer+page_size*ctr;
max_offset = buffer_size - page_size*ctr;
}
}
template<bool must_be_contiguous = false>
std::optional<token_t> try_advance(
std::atomic<offset_t>& lead,
std::atomic<offset_t>& fence,
offset_t amount
) {
offset_t old_offset = lead.load(std::memory_order_relaxed);
offset_t new_offset = old_offset + amount;
offset_t fence_v = fence.load(std::memory_order_relaxed);
// Check if we jumped the fence
if(fence_v <= new_offset && fence_v > old_offset) {
goto handle_fence;
}
// Check if we jumped the fence while overflowing
if(new_offset >= max_offset) {
if constexpr(must_be_contiguous) {
new_offset = amount;
} else {
new_offset %= max_offset;
}
if(fence_v <= new_offset) {
goto handle_fence;
}
}
goto handle_fence_end;
handle_fence:
if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
return std::nullopt;
} else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_release, std::memory_order_relaxed)) {
return std::nullopt;
} else {
offset_t v;
do {
v = read_lead.load(std::memory_order_acquire);
if(v >= new_offset) break;
} while(read_lead.compare_exchange_weak(v, new_offset, std::memory_order_release, std::memory_order_relaxed));
}
} else {
static_assert(
OverflowStrategyType::on_overflow == overflow_response_t::must_wait
|| OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
);
}
handle_fence_end:
if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
return std::nullopt;
}
return token_t{old_offset, new_offset};
}
token_t reserve_write(size_t sz) {
std::optional<token_t> tok;
OverflowStrategyType waiter;
while(true) {
tok = try_advance(write_lead, read_trailer, sz);
if(tok) break;
waiter.wait();
}
// std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
return tok.value();
}
token_t reserve_write(size_t sz, force_contiguous_mode) {
std::optional<token_t> tok;
OverflowStrategyType waiter;
while(true) {
tok = try_advance<true>(write_lead, read_trailer, sz);
if(tok) break;
waiter.wait();
}
// std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
return tok.value();
}
static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
OverflowStrategyType waiter;
while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
}
token_t reserve_read() {
offset_t old_offset = read_lead.load(std::memory_order_relaxed);
offset_t new_offset = write_trailer.load(std::memory_order_relaxed);
if(old_offset > new_offset) new_offset = 0;
OverflowStrategyType waiter;
while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
waiter.wait();
}
// std::cout << old_offset << " rr " << new_offset << std::endl;
return token_t{old_offset, new_offset};
}
void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
OverflowStrategyType waiter;
while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
// std::cout << tok.start << " cr " << tok.end << std::endl;
}
char& operator[](size_t offset) {
return data_buffer[offset];
}
[[nodiscard]] offset_t size() const {
return max_offset;
}
};
struct OverflowWait {
static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait;
void wait() {
#if defined(__clang__) || defined(__GNUC__)
__asm__("nop\n\t");
#elif _MSC_VER
__nop;
#endif
}
};