From 5f32c464e817210125ffcfe4546d090acba5d51e Mon Sep 17 00:00:00 2001 From: Ludovic 'Archivist' Lagouardette Date: Thu, 13 Oct 2022 18:14:22 +0200 Subject: [PATCH] Added a disruptor implementation --- LibSnugLog/include/disruptor.h | 177 ++++++++++++++++++++++++++++++++ LibSnugLog/source/disruptor.cpp | 2 + 2 files changed, 179 insertions(+) diff --git a/LibSnugLog/include/disruptor.h b/LibSnugLog/include/disruptor.h index 3f59c93..32c0588 100644 --- a/LibSnugLog/include/disruptor.h +++ b/LibSnugLog/include/disruptor.h @@ -1,2 +1,179 @@ #pragma once +#include +#include +#include +#include +#include + +#ifdef __cpp_lib_hardware_interference_size +static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size); +static constexpr size_t line_length = std::hardware_constructive_interference_size; +#else +static constexpr size_t max_interference_size = 128; +static constexpr size_t line_length = 64; +#endif + +template +struct alignas(max_interference_size) padded_atomic final { + std::atomic value; +}; + +using offset_t = size_t; +extern const size_t page_size; + +enum class overflow_response_t { + must_wait, + must_overflow +}; + +#ifdef __cpp_concepts +template +concept OverflowStrategyType = requires (T strategy) { + {T::on_overflow} -> std::same_as; + {strategy.wait()}; +}; +#endif + +struct force_contiguous_mode {}; +struct token_t {offset_t start; offset_t end;}; + +struct disruptor_exception : public std::runtime_error { + disruptor_exception() : std::runtime_error("Unknown error disruptor") {} + explicit disruptor_exception(const char* str) : std::runtime_error(str) {} +}; + +template +#ifdef __cpp_concepts + requires OverflowStrategyType +#endif +class disruptor +{ + char* buffer; + size_t buffer_size; + std::atomic& read_trailer; + std::atomic& read_lead; + + std::atomic& write_trailer; + std::atomic& write_lead; + + offset_t& max_offset; + + char* data_buffer; +public: + disruptor(char* _buffer, const size_t _buffer_size) + : buffer(_buffer) + , buffer_size(_buffer_size) + , read_trailer (*reinterpret_cast*>(_buffer+max_interference_size*0)) + , read_lead (*reinterpret_cast*>(_buffer+max_interference_size*1)) + , write_trailer(*reinterpret_cast*>(_buffer+max_interference_size*2)) + , write_lead (*reinterpret_cast*>(_buffer+max_interference_size*3)) + , max_offset (*reinterpret_cast (_buffer+max_interference_size*4)) + { + if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor"); + data_buffer = buffer+max_interference_size*5; + if(data_buffer <= buffer+page_size) { + data_buffer = buffer+page_size; + max_offset = buffer_size - page_size; + } else if(data_buffer > buffer+page_size) { + size_t ctr = 0; + do { + ctr++; + } while(data_buffer > buffer+page_size*ctr); + data_buffer = buffer+page_size*ctr; + max_offset = buffer_size - page_size*ctr; + } + } + + template + std::optional try_advance( + std::atomic& lead, + std::atomic fence, + offset_t amount + ) { + offset_t old_offset = lead.load(std::memory_order_relaxed); + offset_t new_offset = old_offset + amount; + offset_t fence_v = fence.load(std::memory_order_relaxed); + + // Check if we jumped the fence + if(fence_v <= new_offset && fence_v > old_offset) { + return std::nullopt; + } + + // Check if we jumped the fence while overflowing + if(new_offset >= max_offset) { + if constexpr(must_be_contiguous) { + new_offset = amount; + } else { + new_offset %= max_offset; + } + if(fence_v <= new_offset) { + return std::nullopt; + } + } + + if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) { + return std::nullopt; + } + + return token_t{old_offset, new_offset}; + } + + token_t reserve_write(size_t sz) { + std::optional tok; + OverflowStrategy waiter; + while(true) { + tok = try_advance(write_lead, read_trailer, sz); + if(tok) break; + waiter.wait(); + } + return tok.value(); + } + token_t reserve_write(size_t sz, force_contiguous_mode) { + std::optional tok; + OverflowStrategy waiter; + while(true) { + tok = try_advance(write_lead, read_trailer, sz); + if(tok) break; + waiter.wait(); + } + return tok.value(); + } + void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v) { + OverflowStrategy waiter; + while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { + waiter.wait(); + } + } + + token_t reserve_read() { + offset_t old_offset = read_lead.load(std::memory_order_relaxed); + offset_t new_offset = write_trailer.load(std::memory_order_relaxed); + if(old_offset > new_offset) new_offset = 0; + OverflowStrategy waiter; + + while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) { + waiter.wait(); + } + + return token_t{old_offset, new_offset}; + } + void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v) { + OverflowStrategy waiter; + while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { + waiter.wait(); + } + } +}; + +struct OverflowWait { + static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait; + void wait() { +#ifdef __clang__ or __GNUC__ + __asm__("nop\n\t"); +#elif _MSC_VER + __nop; +#endif + + } +}; \ No newline at end of file diff --git a/LibSnugLog/source/disruptor.cpp b/LibSnugLog/source/disruptor.cpp index 8b13789..7c51be5 100644 --- a/LibSnugLog/source/disruptor.cpp +++ b/LibSnugLog/source/disruptor.cpp @@ -1 +1,3 @@ +#include "disruptor.h" +const size_t page_size = sysconf(_SC_PAGESIZE); \ No newline at end of file