From 127ecaaa67be0f4b445c862b881f307cbf8e5d53 Mon Sep 17 00:00:00 2001 From: Ludovic 'Archivist' Lagouardette Date: Tue, 18 Oct 2022 11:47:56 +0200 Subject: [PATCH] Added the API for C++ strategies that will then translate to C --- LibSnugLog/CMakeLists.txt | 2 +- LibSnugLog/include/disruptor.h | 45 +++--- LibSnugLog/include/registry.h | 2 + LibSnugLog/public_include/sl/register.h | 133 +++++++++++++++++ LibSnugLog/public_include/sl/strategies.h | 153 ++++++++++++++++++- LibSnugLog/public_include/sl/transaction.h | 11 +- LibSnugLog/source/disruptor.cpp | 3 +- LibSnugLog/source/registry.cpp | 166 +++++++++++++++++++++ Tests/CMakeLists.txt | 1 + Tests/disruptor.cpp | 80 +++++++++- 10 files changed, 561 insertions(+), 35 deletions(-) create mode 100644 LibSnugLog/source/registry.cpp diff --git a/LibSnugLog/CMakeLists.txt b/LibSnugLog/CMakeLists.txt index 1ff12e8..bbff36e 100644 --- a/LibSnugLog/CMakeLists.txt +++ b/LibSnugLog/CMakeLists.txt @@ -5,4 +5,4 @@ include_directories(./include) add_library(LibSnugLog include/disruptor.h - source/disruptor.cpp include/sink.h include/registry.h include/source.h public_include/sl/strategies.h public_include/sl/register.h public_include/sl/transaction.h) \ No newline at end of file + source/disruptor.cpp include/sink.h include/registry.h include/source.h public_include/sl/strategies.h public_include/sl/register.h public_include/sl/transaction.h source/registry.cpp) \ No newline at end of file diff --git a/LibSnugLog/include/disruptor.h b/LibSnugLog/include/disruptor.h index dbd300a..7fcc9ad 100644 --- a/LibSnugLog/include/disruptor.h +++ b/LibSnugLog/include/disruptor.h @@ -7,6 +7,8 @@ #include #include +#include "sl/strategies.h" + #ifdef __cpp_lib_hardware_interference_size static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size); static constexpr size_t line_length = std::hardware_constructive_interference_size; @@ -23,19 +25,6 @@ struct alignas(max_interference_size) padded_atomic final { using offset_t = size_t; extern const size_t page_size; -enum class overflow_response_t { - must_wait, - must_overflow -}; - -#ifdef __cpp_concepts -template -concept OverflowStrategyType = requires (T strategy) { - {T::on_overflow} -> std::same_as; - {strategy.wait()}; -}; -#endif - struct force_contiguous_mode {}; struct token_t {offset_t start; offset_t end;}; @@ -44,9 +33,9 @@ struct disruptor_exception : public std::runtime_error { explicit disruptor_exception(const char* str) : std::runtime_error(str) {} }; -template +template #ifdef __cpp_concepts - requires OverflowStrategyType + requires OverflowStrategy #endif class disruptor { @@ -115,9 +104,9 @@ public: goto handle_fence_end; handle_fence: - if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_wait) { + if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) { return std::nullopt; - } else if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_overflow) { + } else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) { if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_release, std::memory_order_relaxed)) { return std::nullopt; } else { @@ -129,8 +118,8 @@ public: } } else { static_assert( - OverflowStrategy::on_overflow == overflow_response_t::must_wait - || OverflowStrategy::on_overflow == overflow_response_t::must_overflow + OverflowStrategyType::on_overflow == overflow_response_t::must_wait + || OverflowStrategyType::on_overflow == overflow_response_t::must_overflow ); } handle_fence_end: @@ -144,7 +133,7 @@ public: token_t reserve_write(size_t sz) { std::optional tok; - OverflowStrategy waiter; + OverflowStrategyType waiter; while(true) { tok = try_advance(write_lead, read_trailer, sz); if(tok) break; @@ -155,7 +144,7 @@ public: } token_t reserve_write(size_t sz, force_contiguous_mode) { std::optional tok; - OverflowStrategy waiter; + OverflowStrategyType waiter; while(true) { tok = try_advance(write_lead, read_trailer, sz); if(tok) break; @@ -165,10 +154,10 @@ public: return tok.value(); } - static constexpr const auto& tmp_fn = &OverflowStrategy::wait; + static constexpr const auto& tmp_fn = &OverflowStrategyType::wait; - void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v) { - OverflowStrategy waiter; + void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v) { + OverflowStrategyType waiter; while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { waiter.wait(); } @@ -178,7 +167,7 @@ public: offset_t old_offset = read_lead.load(std::memory_order_relaxed); offset_t new_offset = write_trailer.load(std::memory_order_relaxed); if(old_offset > new_offset) new_offset = 0; - OverflowStrategy waiter; + OverflowStrategyType waiter; while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) { waiter.wait(); @@ -186,8 +175,8 @@ public: // std::cout << old_offset << " rr " << new_offset << std::endl; return token_t{old_offset, new_offset}; } - void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v) { - OverflowStrategy waiter; + void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v) { + OverflowStrategyType waiter; while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) { waiter.wait(); } @@ -213,4 +202,4 @@ struct OverflowWait { #endif } -}; \ No newline at end of file +}; diff --git a/LibSnugLog/include/registry.h b/LibSnugLog/include/registry.h index 6f70f09..45dcbb0 100644 --- a/LibSnugLog/include/registry.h +++ b/LibSnugLog/include/registry.h @@ -1 +1,3 @@ #pragma once + + diff --git a/LibSnugLog/public_include/sl/register.h b/LibSnugLog/public_include/sl/register.h index a6ef0bf..1ee0524 100644 --- a/LibSnugLog/public_include/sl/register.h +++ b/LibSnugLog/public_include/sl/register.h @@ -3,6 +3,10 @@ #ifdef __cplusplus #include #include +#include +#include +#include +#include #else #include #include @@ -11,18 +15,147 @@ #include "sl/strategies.h" +/** + * @brief represents a buffer strategy, generally as an obscure pointer + */ struct sl_buffer_strategy; + +/** + * @brief represents a sink strategy, generally as an obscure pointer + */ struct sl_sink_strategy; + +/** + * @brief represents an overflow strategy, generally as an obscure pointer + */ struct sl_overflow_strategy; + +/** + * @brief represents an output strategy, generally as an obscure pointer + */ struct sl_output_strategy; #ifdef __cplusplus namespace sl { + /** + * @brief represents a c-style buffer strategy + */ using buffer_strategy = sl_buffer_strategy; + + /** + * @brief represents a c-style sink strategy + */ using sink_strategy = sl_sink_strategy; + + /** + * @brief represents a c-style overflow strategy + */ using overflow_strategy = sl_overflow_strategy; + + /** + * @brief represents a c-style output strategy + */ using output_strategy = sl_output_strategy; } + +template +void register_log(std::string_view buffer_filename, uint64_t buffer_size, uint64_t out_strategy_parameter, std::string_view output_directory); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); + +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); +template<> void register_log(std::string_view, uint64_t, uint64_t, std::string_view); #else typedef struct sl_buffer_strategy sl_buffer_strategy; typedef struct sl_sink_strategy sl_sink_strategy; diff --git a/LibSnugLog/public_include/sl/strategies.h b/LibSnugLog/public_include/sl/strategies.h index e2963c3..c1f6dfc 100644 --- a/LibSnugLog/public_include/sl/strategies.h +++ b/LibSnugLog/public_include/sl/strategies.h @@ -1,5 +1,13 @@ #pragma once + +#ifdef __cplusplus + +#include +#include + +#endif + /** * @brief Describes what should happen if the allocated buffer is full and no logs can be written */ @@ -41,4 +49,147 @@ typedef enum SL_ON_SINK_FULL SL_ON_SINK_FULL; typedef enum SL_SINK_IO_TYPE SL_SINK_IO_TYPE; typedef enum SL_BUFFER_TYPE SL_BUFFER_TYPE; typedef enum SL_ROLL_LOGS SL_ROLL_LOGS; -#endif \ No newline at end of file +#endif + +#ifdef __cpp_concepts +/** + * @brief Represents the behaviour if the buffer would be overflowing. + */ +enum class overflow_response_t { + must_wait = 0, /**< Represents that on overflow, the writer should wait using the strategy provided wait function. */ + must_overflow = 1, /**< Represents that on overflow, the writer should push the fence and keep on writing immediately, even if the log would get corrupt. */ + must_drop = must_wait /**< UNUSED, may be used in the future to mean that the current write should be dropped. */ +}; + + +/** + * @brief This concept maps an overflow strategy. + */ +template +concept OverflowStrategy = requires (T strategy) { + {T::on_overflow} -> std::same_as; + {strategy.wait()}; +}; + +/** + * @brief This concept represents something that should be writable and mapped to valid memory within the software. + * + * @details This is used for mapping allocated buffers made from buffer strategies. + */ +template +concept BufferLike = requires (T buffer) { + {buffer.data()} -> std::same_as; + {buffer.size()} -> std::same_as; +}; + +/** + * @brief This concept maps a buffer strategy. + */ +template +concept BufferStrategy = requires (T strategy, size_t size) { + {strategy.build_buffer(size)} -> BufferLike; +}; + +/** + * @brief This concept maps a sink strategy. + */ +template +concept SinkStrategy = requires (T strategy, int fd, std::string_view data) { + {strategy.write(fd, data)}; +}; + +/** + * @brief This concept maps an output strategy. + */ +template +concept OutputStrategy = requires (T strategy, std::string_view source, int fd) { + {strategy.chunk(source)} -> std::same_as>; + {strategy.on_write_completed_event(source, fd)} -> std::same_as; +}; +#endif + +#ifdef __cplusplus +#ifndef __cpp_concepts + +#define OverflowStrategy typename +#define BufferStrategy typename +#define SinkStrategy typename +#define OutputStrategy typename + +#endif + +/** + * @brief Represent a mapped buffer. You should generally not touch this as it is internal representation for strategies. + */ +struct mapped_buffer { + char* _data; + size_t _size; + + /** + * @return the data pointer of the buffer + */ + [[nodiscard]] char* data() const { return _data; } + /** + * @return the size of the allocated buffer + */ + [[nodiscard]] size_t size() const { return _size; } +}; + +struct BufferStrategyInternal { + using buffer_type = std::vector; + static_assert(BufferLike); + buffer_type build_buffer(size_t); +}; + +struct BufferStrategyShared { + using buffer_type = mapped_buffer; + static_assert(BufferLike); + buffer_type build_buffer(size_t); +}; +struct BufferStrategyExternal { + using buffer_type = mapped_buffer; + static_assert(BufferLike); + buffer_type build_buffer(size_t); +}; + +struct SinkStrategyDirect { + void write(int fd, std::string_view data); +}; +struct SinkStrategyFastest { + void write(int fd, std::string_view data); +}; +struct SinkStrategyMmaped { + void write(int fd, std::string_view data); +}; +struct SinkStrategyExternal { + void write(int fd, std::string_view data); +}; + +struct OverflowStrategyWait { + static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait; + void wait(); +}; +struct OverflowStrategyContinue { + static constexpr overflow_response_t on_overflow = overflow_response_t::must_overflow; + void wait(); +}; + +struct OutputStrategyTimed { + std::chrono::seconds interval; + std::chrono::time_point last_change; + + std::pair chunk(std::string_view); + int on_write_completed_event(std::string_view, int); +}; +struct OutputStrategySized { + uint64_t interval; + uint64_t written_bytes; + + std::pair chunk(std::string_view); + int on_write_completed_event(std::string_view, int); +}; +struct OutputStrategySimple { + std::pair chunk(std::string_view); + int on_write_completed_event(std::string_view, int); +}; +#endif diff --git a/LibSnugLog/public_include/sl/transaction.h b/LibSnugLog/public_include/sl/transaction.h index ae3258d..1f2984f 100644 --- a/LibSnugLog/public_include/sl/transaction.h +++ b/LibSnugLog/public_include/sl/transaction.h @@ -40,7 +40,7 @@ namespace sl { * @post * This class leaves the log in a valid state in accordance to the defined log strategies. */ - class log_transaction { + class log_transaction final { char* local_start; size_t local_size; char* buffer_start; @@ -53,7 +53,7 @@ namespace sl { /** * @brief A forward iterator to the current log transaction. */ - class iterator { + class iterator final { using iterator_category = std::forward_iterator_tag; using difference_type = std::ptrdiff_t; using value_type = char; @@ -177,7 +177,12 @@ typedef struct sl_log_transaction_internals log_transaction_internals; struct sl_log_transaction { char* begin; /**< The start of the given writable span */ char* end; /**< 1 element past the end of the writable span */ - sl_log_transaction_internals* internals; /** Internal, do not touch */ + + /** + * @brief Internal, do not touch + * @internal this is a pointer to a registry_slab, hence the need for reference stability/pointer stability + */ + sl_log_transaction_internals* internals; }; #ifdef __cplusplus diff --git a/LibSnugLog/source/disruptor.cpp b/LibSnugLog/source/disruptor.cpp index 7c51be5..8cbada4 100644 --- a/LibSnugLog/source/disruptor.cpp +++ b/LibSnugLog/source/disruptor.cpp @@ -1,3 +1,4 @@ #include "disruptor.h" -const size_t page_size = sysconf(_SC_PAGESIZE); \ No newline at end of file +const size_t page_size = sysconf(_SC_PAGESIZE); + diff --git a/LibSnugLog/source/registry.cpp b/LibSnugLog/source/registry.cpp new file mode 100644 index 0000000..a1b9758 --- /dev/null +++ b/LibSnugLog/source/registry.cpp @@ -0,0 +1,166 @@ +#include "registry.h" +#include "disruptor.h" +#include "sl/register.h" + +#include +#include +#include +#include +#include + + +/** + * @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding + */ +template +class fast_semaphore { + std::atomic flag; //< This is our counter +public: + fast_semaphore() = default; + fast_semaphore(fast_semaphore&) = delete; + fast_semaphore(fast_semaphore&&) = delete; + + /// 3 things may happen when trying to unlock + enum class tristate { + success = 1, //< The unlocking was successful + timing = 0, //< Someone interfered with the unlocking + error = -1 //< The unlocking would over-unlock the semaphore + }; + + /// We try locking until we succeed + template + void lock() { + while(not try_lock()); + } + + /// For locking, we try to atomically increment the counter while maintaining it below the set limit + template + [[nodiscard]] bool try_lock() { + T expect = flag.load(std::memory_order::acquire); + T target = expect + increment; + if(target > maximum_increment) return false; + return flag.compare_exchange_strong(expect,target,std::memory_order::release); + } + + /// Similarly to locking, we try unlocking until we succeed (or reach an invalid state) + template + void unlock() { + tristate v; + do{ v = try_unlock(); } + while(v == tristate::timing); + if(v != tristate::success) { + throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue"); + } + } + + /// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero + template + [[nodiscard]] tristate try_unlock() { + T expect = flag.load(std::memory_order::relaxed); + T target = expect - increment; + if(target < 0) return tristate::error; + return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing; + } +}; + +using rw_lock_type = fast_semaphore; + +class lock_handler_read { + rw_lock_type& ref; + explicit lock_handler_read(rw_lock_type& _ref) : ref(_ref) { + while(ref.try_lock<1>()); + } + + ~lock_handler_read() { + while(ref.try_unlock<1>() != rw_lock_type::tristate::success); + } +}; + +class lock_handler_write { + rw_lock_type& ref; + explicit lock_handler_write(rw_lock_type& _ref) : ref(_ref) { + while(ref.try_lock()); + } + + ~lock_handler_write() { + while(ref.try_unlock() != rw_lock_type::tristate::success); + } +}; + +static fast_semaphore registry_rw_lock; + + +struct registry_slab { + int id; + std::string name; + std::function reserve_write; + std::function reserve_write_c_align; + std::function conclude_write; + std::any disruptor; +}; + +/** + * @internal used because we need the pointer stability + * @see sl_transaction + */ +static std::unordered_map registry_map; + +BufferStrategyInternal::buffer_type BufferStrategyInternal::build_buffer(size_t) { + return {}; +} + +BufferStrategyShared::buffer_type BufferStrategyShared::build_buffer(size_t) { + return {}; +} + +BufferStrategyExternal::buffer_type BufferStrategyExternal::build_buffer(size_t) { + return {}; +} + +void SinkStrategyDirect::write(int fd, std::string_view data) { + +} + +void SinkStrategyFastest::write(int fd, std::string_view data) { + +} + +void SinkStrategyMmaped::write(int fd, std::string_view data) { + +} + +void SinkStrategyExternal::write(int fd, std::string_view data) { + +} + +void OverflowStrategyWait::wait() { + +} + +void OverflowStrategyContinue::wait() { + +} + +std::pair OutputStrategyTimed::chunk(std::string_view) { + return {}; +} + +int OutputStrategyTimed::on_write_completed_event(std::string_view, int) { + return 0; +} + +std::pair OutputStrategySized::chunk(std::string_view) { + return {}; +} + +int OutputStrategySized::on_write_completed_event(std::string_view, int) { + return 0; +} + +std::pair OutputStrategySimple::chunk(std::string_view) { + return {}; +} + +int OutputStrategySimple::on_write_completed_event(std::string_view, int) { + return 0; +} diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index 3372535..392e86b 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -11,6 +11,7 @@ FetchContent_MakeAvailable(Catch2) add_executable(tests disruptor.cpp) target_link_libraries(tests PRIVATE LibSnugLog) target_link_libraries(tests PRIVATE Catch2::Catch2WithMain) +include_directories(../LibSnugLog/public_include) include(CTest) include(Catch) diff --git a/Tests/disruptor.cpp b/Tests/disruptor.cpp index 6b310df..340749f 100644 --- a/Tests/disruptor.cpp +++ b/Tests/disruptor.cpp @@ -1,3 +1,4 @@ +#include #include "catch2/catch_all.hpp" #include "../LibSnugLog/include/disruptor.h" @@ -49,7 +50,7 @@ TEST_CASE("Disruptor works sequentially") { } v.conclude_write(W); auto R = v.reserve_read(); - for(size_t idx = W.start; idx != W.end; idx = (idx+1)%v.size()) { + for(size_t idx = R.start; idx != R.end; idx = (idx+1)%v.size()) { mset.insert(v[idx]); } v.conclude_read(R); @@ -58,4 +59,81 @@ TEST_CASE("Disruptor works sequentially") { REQUIRE(mset.count((char)i) == 100); } } + + SECTION("Disruptor concurrent odd vs even") { + std::atomic trigger = false; + std::multiset mset; + std::stringstream continuity; + + int acc = 0; + for(int i = 0; i<= 255; i++) { + acc+=i; + } + + std::thread reader([&](){ + int cnt = 0; + while (cnt != acc) { + auto R = v.reserve_read(); + for (size_t idx = R.start; idx != R.end; idx = (idx + 1) % v.size()) { + mset.insert(v[idx]); + continuity << (char)v[idx]; + } + v.conclude_read(R); + cnt += (R.end > R.start) * (R.end - R.start) + + (R.end < R.start) * (v.size() - R.start + R.end); + } + }); + + std::thread even([&]() { + while(!trigger.load()); + for (int i = 2; i <= 255; i += 2) { + auto W = v.reserve_write(i); + v[W.start] = (char) i; + for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { + v[idx] = (char) i; + } + v.conclude_write(W); + } + }); + + std::thread odd([&]() { + while(!trigger.load()); + for (int i = 1; i <= 255; i += 2) { + auto W = v.reserve_write(i); + v[W.start] = (char) i; + for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { + v[idx] = (char) i; + } + v.conclude_write(W); + } + }); + + // byte received count test + trigger.store(true); + reader.join(); even.join(); odd.join(); + for(int i = 1; i <= 255; i++) { + REQUIRE(mset.count((char)i) == i); + } + + + // Continuity tests + int changes = 0; + auto str = continuity.str(); + char current = *str.begin(); + auto it = str.begin(); + for(;it != str.end();) { + while(it != str.end() && *it == current) {++it;} + changes += 1; + current = *it; + } + REQUIRE(changes == 255); + } +} + +TEST_CASE("Fails if buffer too small") { + REQUIRE_THROWS_AS(disruptor(nullptr, page_size), disruptor_exception); +} + +TEST_CASE("Fails if buffer size is 0") { + REQUIRE_THROWS_AS(disruptor(nullptr, 0), disruptor_exception); } \ No newline at end of file