#pragma once #include #include #include #include #include #include #include #include "sl/strategies.h" #if not defined(FORCE_HW_INTERFERENCE) #define FORCE_HW_INTERFERENCE 64 #endif #if not defined(FORCE_LINE_LENGTH) #define FORCE_LINE_LENGTH 128 #endif static constexpr size_t max_interference_size = FORCE_HW_INTERFERENCE; static constexpr size_t line_length = FORCE_LINE_LENGTH; template struct alignas(max_interference_size) padded_atomic final { std::atomic value; }; using offset_t = size_t; extern const size_t page_size; struct force_contiguous_mode_t {}; constexpr force_contiguous_mode_t force_contiguous_mode; struct token_t {offset_t start; offset_t end;}; struct disruptor_exception : public std::runtime_error { disruptor_exception() : std::runtime_error("Unknown error disruptor") {} explicit disruptor_exception(const char* str) : std::runtime_error(str) {} }; template #ifdef __cpp_concepts requires OverflowStrategy #endif class disruptor { char* buffer; size_t buffer_size; std::atomic* read_trailer; std::atomic* read_lead; std::atomic* write_trailer; std::atomic* write_lead; offset_t* max_offset; char* data_buffer; std::any buffer_life_extender; std::any strategy_life_extender; public: /** * Constructs a disruptor from the provided strategy, initialization_and_checks() is responsible for the initialization * @param strategy The provided strategy. The provided strategy is preserved, intact if properly movable and if moving is "pointer stable". The obtained buffer is also preserved under the same supposition. * @param _buffer_size The total size of the buffer to allocate, keep in mind this includes not only the */ template disruptor(buffer_strategy strategy, std::string_view _buffer_filename, const size_t _buffer_size) : read_trailer{} , read_lead{} , write_trailer{} , write_lead{} { auto tmp = strategy.build_buffer(_buffer_filename, _buffer_size); buffer = tmp.data(); buffer_size = tmp.size(); read_trailer = reinterpret_cast*>(buffer+max_interference_size*0); read_lead = reinterpret_cast*>(buffer+max_interference_size*1); write_trailer = reinterpret_cast*>(buffer+max_interference_size*2); write_lead = reinterpret_cast*>(buffer+max_interference_size*3); max_offset = reinterpret_cast (buffer+max_interference_size*4); buffer_life_extender = std::move(tmp); strategy_life_extender = std::move(strategy); initialization_and_checks(); } /** * Constructs a disruptor from the provided memory span, initialization_and_checks() is responsible for the initialization * @ref initialization_and_checks() * @param _buffer * @param _buffer_size */ disruptor(char* _buffer, const size_t _buffer_size) : buffer(_buffer) , buffer_size(_buffer_size) , read_trailer (reinterpret_cast*>(_buffer+max_interference_size*0)) , read_lead (reinterpret_cast*>(_buffer+max_interference_size*1)) , write_trailer(reinterpret_cast*>(_buffer+max_interference_size*2)) , write_lead (reinterpret_cast*>(_buffer+max_interference_size*3)) , max_offset (reinterpret_cast (_buffer+max_interference_size*4)) { initialization_and_checks(); } void initialization_and_checks() { if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor"); data_buffer = buffer+max_interference_size*5; if(data_buffer <= buffer+page_size) { data_buffer = buffer+page_size; *max_offset = buffer_size - page_size; } else if(data_buffer > buffer+page_size) { size_t ctr = 0; do { ctr++; } while(data_buffer > buffer+page_size*ctr); data_buffer = buffer+page_size*ctr; *max_offset = buffer_size - page_size*ctr; } } template std::optional try_advance( std::atomic& lead, std::atomic& fence, offset_t amount ) { offset_t old_offset = lead.load(std::memory_order_seq_cst); offset_t new_offset = old_offset + amount; offset_t fence_v = fence.load(std::memory_order_seq_cst); // Check if we jumped the fence if(fence_v <= new_offset && fence_v > old_offset) { goto handle_fence; } // Check if we jumped the fence while overflowing if(new_offset >= *max_offset) { if constexpr(must_be_contiguous) { new_offset = amount; } else { new_offset %= *max_offset; } if(fence_v <= new_offset) { goto handle_fence; } } goto handle_fence_end; handle_fence: if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) { return std::nullopt; } else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) { if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_seq_cst)) { return std::nullopt; } else { offset_t v; do { v = read_lead->load(std::memory_order_seq_cst); if(v >= new_offset) break; } while(read_lead->compare_exchange_weak(v, new_offset, std::memory_order_seq_cst)); } } else { static_assert( OverflowStrategyType::on_overflow == overflow_response_t::must_wait || OverflowStrategyType::on_overflow == overflow_response_t::must_overflow ); } handle_fence_end: if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) { return std::nullopt; } return token_t{old_offset, new_offset}; } token_t reserve_write(size_t sz) { std::optional tok; OverflowStrategyType waiter; while(true) { tok = try_advance(*write_lead, *read_trailer, sz); if(tok) break; waiter.wait(); } // std::cout << tok.value().start << " rw " << tok.value().end << std::endl; return tok.value(); } token_t reserve_write(size_t sz, force_contiguous_mode_t) { std::optional tok; OverflowStrategyType waiter; while(true) { tok = try_advance(*write_lead, *read_trailer, sz); if(tok) break; waiter.wait(); } // std::cout << tok.value().start << " rw " << tok.value().end << std::endl; return tok.value(); } static constexpr const auto& tmp_fn = &OverflowStrategyType::wait; void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v) { OverflowStrategyType waiter; while(!write_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) { waiter.wait(); } } token_t reserve_read() { offset_t old_offset = read_lead->load(std::memory_order_seq_cst); offset_t new_offset = write_trailer->load(std::memory_order_seq_cst); if(old_offset > new_offset) new_offset = 0; OverflowStrategyType waiter; while(!read_lead->compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) { waiter.wait(); } // std::cout << old_offset << " rr " << new_offset << std::endl; return token_t{old_offset, new_offset}; } void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v) { OverflowStrategyType waiter; while(!read_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) { waiter.wait(); } // std::cout << tok.start << " cr " << tok.end << std::endl; } char& operator[](size_t offset) { return data_buffer[offset]; } char* data() { return data_buffer; } [[nodiscard]] offset_t size() const { return *max_offset; } }; class write_span { offset_t start, end; size_t target_sz; char* target_buffer; public: template write_span(token_t tok, disruptor& _target) : start(tok.start) , end(tok.end) , target_sz(_target.size()) , target_buffer(_target.data()) {} private: write_span(const write_span& src, size_t ltrim) : start((src.start + ltrim) % src.target_sz) , end(src.end) , target_sz(src.target_sz) , target_buffer(src.target_buffer) {} public: char& front() { return target_buffer[start]; } [[nodiscard]] size_t size() const { return start <= end ? end - start : end + target_sz - start; } write_span subspan(size_t ltrim) { if(ltrim > size()) throw disruptor_exception("write_span overflow, ltrim greater than available span"); return write_span(*this, ltrim); } [[nodiscard]] bool empty() const { return end==start; } };