#pragma once #include #include #include #include #include #ifdef __cpp_lib_hardware_interference_size static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size); static constexpr size_t line_length = std::hardware_constructive_interference_size; #else static constexpr size_t max_interference_size = 128; static constexpr size_t line_length = 64; #endif template struct alignas(max_interference_size) padded_atomic final { std::atomic value; }; using offset_t = size_t; extern const size_t page_size; enum class overflow_response_t { must_wait, must_overflow }; #ifdef __cpp_concepts template concept OverflowStrategyType = requires (T strategy) { {T::on_overflow} -> std::same_as; {strategy.wait()}; }; #endif struct force_contiguous_mode {}; struct token_t {offset_t start; offset_t end;}; struct disruptor_exception : public std::runtime_error { disruptor_exception() : std::runtime_error("Unknown error disruptor") {} explicit disruptor_exception(const char* str) : std::runtime_error(str) {} }; template #ifdef __cpp_concepts requires OverflowStrategyType #endif class disruptor { char* buffer; size_t buffer_size; std::atomic& read_trailer; std::atomic& read_lead; std::atomic& write_trailer; std::atomic& write_lead; offset_t& max_offset; char* data_buffer; public: disruptor(char* _buffer, const size_t _buffer_size) : buffer(_buffer) , buffer_size(_buffer_size) , read_trailer (*reinterpret_cast*>(_buffer+max_interference_size*0)) , read_lead (*reinterpret_cast*>(_buffer+max_interference_size*1)) , write_trailer(*reinterpret_cast*>(_buffer+max_interference_size*2)) , write_lead (*reinterpret_cast*>(_buffer+max_interference_size*3)) , max_offset (*reinterpret_cast (_buffer+max_interference_size*4)) { if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor"); data_buffer = buffer+max_interference_size*5; if(data_buffer <= buffer+page_size) { data_buffer = buffer+page_size; max_offset = buffer_size - page_size; } else if(data_buffer > buffer+page_size) { size_t ctr = 0; do { ctr++; } while(data_buffer > buffer+page_size*ctr); data_buffer = buffer+page_size*ctr; max_offset = buffer_size - page_size*ctr; } } template std::optional try_advance( std::atomic& lead, std::atomic fence, offset_t amount ) { offset_t old_offset = lead.load(std::memory_order_relaxed); offset_t new_offset = old_offset + amount; offset_t fence_v = fence.load(std::memory_order_relaxed); // Check if we jumped the fence if(fence_v <= new_offset && fence_v > old_offset) { goto handle_fence; } // Check if we jumped the fence while overflowing if(new_offset >= max_offset) { if constexpr(must_be_contiguous) { new_offset = amount; } else { new_offset %= max_offset; } if(fence_v <= new_offset) { goto handle_fence; } } goto handle_fence_end; handle_fence: if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_wait) { return std::nullopt; } else if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_overflow) { if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_release, std::memory_order_relaxed)) { return std::nullopt; } else { offset_t v; do { v = read_lead.load(std::memory_order_acquire); if(v >= new_offset) break; } while(read_lead.compare_exchange_weak(v, new_offset, std::memory_order_release, std::memory_order_relaxed)); } } else { static_assert( OverflowStrategy::on_overflow == overflow_response_t::must_wait || OverflowStrategy::on_overflow == overflow_response_t::must_overflow ); } handle_fence_end: if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) { return std::nullopt; } return token_t{old_offset, new_offset}; } token_t reserve_write(size_t sz) { std::optional tok; OverflowStrategy waiter; while(true) { tok = try_advance(write_lead, read_trailer, sz); if(tok) break; waiter.wait(); } return tok.value(); } token_t reserve_write(size_t sz, force_contiguous_mode) { std::optional tok; OverflowStrategy waiter; while(true) { tok = try_advance(write_lead, read_trailer, sz); if(tok) break; waiter.wait(); } return tok.value(); } void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v) { OverflowStrategy waiter; while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { waiter.wait(); } } token_t reserve_read() { offset_t old_offset = read_lead.load(std::memory_order_relaxed); offset_t new_offset = write_trailer.load(std::memory_order_relaxed); if(old_offset > new_offset) new_offset = 0; OverflowStrategy waiter; while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) { waiter.wait(); } return token_t{old_offset, new_offset}; } void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v) { OverflowStrategy waiter; while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { waiter.wait(); } } }; struct OverflowWait { static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait; void wait() { #ifdef __clang__ or __GNUC__ __asm__("nop\n\t"); #elif _MSC_VER __nop; #endif } };