|
|
- #pragma once
-
- #include <cstddef>
- #include <atomic>
- #include <new>
- #include <cassert>
- #include <optional>
- #include <iostream>
- #include <any>
-
- #include "sl/strategies.h"
-
- #if not defined(FORCE_HW_INTERFERENCE)
- #define FORCE_HW_INTERFERENCE 64
- #endif
- #if not defined(FORCE_LINE_LENGTH)
- #define FORCE_LINE_LENGTH 128
- #endif
-
- static constexpr size_t max_interference_size = FORCE_HW_INTERFERENCE;
- static constexpr size_t line_length = FORCE_LINE_LENGTH;
-
- template<typename T>
- struct alignas(max_interference_size) padded_atomic final {
- std::atomic<T> value;
- };
-
- using offset_t = size_t;
- extern const size_t page_size;
-
- struct force_contiguous_mode_t {};
- constexpr force_contiguous_mode_t force_contiguous_mode;
- struct token_t {offset_t start; offset_t end;};
-
- struct disruptor_exception : public std::runtime_error {
- disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
- explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
- };
-
- template<typename OverflowStrategyType>
- #ifdef __cpp_concepts
- requires OverflowStrategy<OverflowStrategyType>
- #endif
- class disruptor
- {
- char* buffer;
- size_t buffer_size;
- std::atomic<offset_t>* read_trailer;
- std::atomic<offset_t>* read_lead;
- std::atomic<offset_t>* write_trailer;
- std::atomic<offset_t>* write_lead;
-
- offset_t* max_offset;
-
- char* data_buffer;
- std::any buffer_life_extender;
- std::any strategy_life_extender;
- public:
- /**
- * Constructs a disruptor from the provided strategy, initialization_and_checks() is responsible for the initialization
- * @param strategy The provided strategy. The provided strategy is preserved, intact if properly movable and if moving is "pointer stable". The obtained buffer is also preserved under the same supposition.
- * @param _buffer_size The total size of the buffer to allocate, keep in mind this includes not only the
- */
- template<BufferStrategy buffer_strategy>
- disruptor(buffer_strategy strategy, std::string_view _buffer_filename, const size_t _buffer_size)
- : read_trailer{}
- , read_lead{}
- , write_trailer{}
- , write_lead{}
- {
- auto tmp = strategy.build_buffer(_buffer_filename, _buffer_size);
- buffer = tmp.data();
- buffer_size = tmp.size();
- read_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*0);
- read_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*1);
- write_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*2);
- write_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*3);
- max_offset = reinterpret_cast<offset_t*> (buffer+max_interference_size*4);
- buffer_life_extender = std::move(tmp);
- strategy_life_extender = std::move(strategy);
- initialization_and_checks();
- }
-
- /**
- * Constructs a disruptor from the provided memory span, initialization_and_checks() is responsible for the initialization
- * @ref initialization_and_checks()
- * @param _buffer
- * @param _buffer_size
- */
- disruptor(char* _buffer, const size_t _buffer_size)
- : buffer(_buffer)
- , buffer_size(_buffer_size)
- , read_trailer (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
- , read_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
- , write_trailer(reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
- , write_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
- , max_offset (reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
- {
- initialization_and_checks();
- }
-
- void initialization_and_checks() {
- if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
- data_buffer = buffer+max_interference_size*5;
- if(data_buffer <= buffer+page_size) {
- data_buffer = buffer+page_size;
- *max_offset = buffer_size - page_size;
- } else if(data_buffer > buffer+page_size) {
- size_t ctr = 0;
- do {
- ctr++;
- } while(data_buffer > buffer+page_size*ctr);
- data_buffer = buffer+page_size*ctr;
- *max_offset = buffer_size - page_size*ctr;
- }
- }
-
- template<bool must_be_contiguous = false>
- std::optional<token_t> try_advance(
- std::atomic<offset_t>& lead,
- std::atomic<offset_t>& fence,
- offset_t amount
- ) {
- offset_t old_offset = lead.load(std::memory_order_seq_cst);
- offset_t new_offset = old_offset + amount;
- offset_t fence_v = fence.load(std::memory_order_seq_cst);
-
- // Check if we jumped the fence
- if(fence_v <= new_offset && fence_v > old_offset) {
- goto handle_fence;
- }
-
- // Check if we jumped the fence while overflowing
- if(new_offset >= *max_offset) {
- if constexpr(must_be_contiguous) {
- new_offset = amount;
- } else {
- new_offset %= *max_offset;
- }
- if(fence_v <= new_offset) {
- goto handle_fence;
- }
- }
-
- goto handle_fence_end;
- handle_fence:
- if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
- return std::nullopt;
- } else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
- if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_seq_cst)) {
- return std::nullopt;
- } else {
- offset_t v;
- do {
- v = read_lead->load(std::memory_order_seq_cst);
- if(v >= new_offset) break;
- } while(read_lead->compare_exchange_weak(v, new_offset, std::memory_order_seq_cst));
- }
- } else {
- static_assert(
- OverflowStrategyType::on_overflow == overflow_response_t::must_wait
- || OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
- );
- }
- handle_fence_end:
-
- if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
- return std::nullopt;
- }
-
- return token_t{old_offset, new_offset};
- }
-
- token_t reserve_write(size_t sz) {
- std::optional<token_t> tok;
- OverflowStrategyType waiter;
- while(true) {
- tok = try_advance(*write_lead, *read_trailer, sz);
- if(tok) break;
- waiter.wait();
- }
- // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
- return tok.value();
- }
- token_t reserve_write(size_t sz, force_contiguous_mode_t) {
- std::optional<token_t> tok;
- OverflowStrategyType waiter;
- while(true) {
- tok = try_advance<true>(*write_lead, *read_trailer, sz);
- if(tok) break;
- waiter.wait();
- }
- // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
- return tok.value();
- }
-
- static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
-
- void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
- OverflowStrategyType waiter;
- while(!write_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
- waiter.wait();
- }
- }
-
- token_t reserve_read() {
- offset_t old_offset = read_lead->load(std::memory_order_seq_cst);
- offset_t new_offset = write_trailer->load(std::memory_order_seq_cst);
- if(old_offset > new_offset) new_offset = 0;
- OverflowStrategyType waiter;
-
- while(!read_lead->compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
- waiter.wait();
- }
- // std::cout << old_offset << " rr " << new_offset << std::endl;
- return token_t{old_offset, new_offset};
- }
- void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
- OverflowStrategyType waiter;
- while(!read_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
- waiter.wait();
- }
- // std::cout << tok.start << " cr " << tok.end << std::endl;
- }
-
- char& operator[](size_t offset) {
- return data_buffer[offset];
- }
-
- char* data() {
- return data_buffer;
- }
-
- [[nodiscard]] offset_t size() const {
- return *max_offset;
- }
- };
-
-
- class write_span {
- offset_t start, end;
- size_t target_sz;
- char* target_buffer;
- public:
- template<typename of_strat>
- write_span(token_t tok, disruptor<of_strat>& _target)
- : start(tok.start)
- , end(tok.end)
- , target_sz(_target.size())
- , target_buffer(_target.data())
- {}
- private:
- write_span(const write_span& src, size_t ltrim)
- : start((src.start + ltrim) % src.target_sz)
- , end(src.end)
- , target_sz(src.target_sz)
- , target_buffer(src.target_buffer)
- {}
- public:
-
- char& front() {
- return target_buffer[start];
- }
-
- [[nodiscard]] size_t size() const {
- return start <= end ? end - start : end + target_sz - start;
- }
-
- write_span subspan(size_t ltrim) {
- if(ltrim > size()) throw disruptor_exception("write_span overflow, ltrim greater than available span");
-
- return write_span(*this, ltrim);
- }
-
- [[nodiscard]] bool empty() const {
- return end==start;
- }
- };
|