From b6cf0929710130b916ee0306a1bb41a4f92dd62d Mon Sep 17 00:00:00 2001 From: Ludovic 'Archivist' Lagouardette Date: Tue, 18 Oct 2022 16:14:59 +0200 Subject: [PATCH] Remove useless stuff --- LibSnugLog/public_include/sl/strategies.h | 54 ++++++++- Tests/disruptor.cpp | 136 ++++++++++++++++++++++ 2 files changed, 188 insertions(+), 2 deletions(-) diff --git a/LibSnugLog/public_include/sl/strategies.h b/LibSnugLog/public_include/sl/strategies.h index c1f6dfc..43d9b63 100644 --- a/LibSnugLog/public_include/sl/strategies.h +++ b/LibSnugLog/public_include/sl/strategies.h @@ -135,59 +135,109 @@ struct mapped_buffer { [[nodiscard]] size_t size() const { return _size; } }; +/** + * @brief a strategy type directing how buffers are allocated. This directs the buffer strategy to use a standard container to allocate. + */ struct BufferStrategyInternal { using buffer_type = std::vector; static_assert(BufferLike); buffer_type build_buffer(size_t); }; +/** + * @brief a strategy that employs a memory mapped buffer to manage its memory. Said buffer is expected to be read from the same process. + */ struct BufferStrategyShared { using buffer_type = mapped_buffer; static_assert(BufferLike); buffer_type build_buffer(size_t); }; + +/** + * @brief a strategy that employs a memory mapped buffer to manage its memory. Said buffer is expected to be read from a different process. + */ struct BufferStrategyExternal { using buffer_type = mapped_buffer; static_assert(BufferLike); buffer_type build_buffer(size_t); }; +/* * ... * */ + +/** + * @brief a strategy that makes the sink flush at every write, making the IO to disk as soon as possible. + */ struct SinkStrategyDirect { void write(int fd, std::string_view data); }; + +/** + * @brief a strategy that makes the sink write with the settings that have the highest throughput. + */ struct SinkStrategyFastest { void write(int fd, std::string_view data); }; + +/** + * @brief a strategy that makes the sink write using memory mapped IO, making the IO always commit to system pages before returning. + */ struct SinkStrategyMmaped { void write(int fd, std::string_view data); }; + +/** + * @brief a strategy that makes the sink do nothing, expecting another process to handle the process. + */ struct SinkStrategyExternal { void write(int fd, std::string_view data); }; +/* * ... * */ + +/** + * @brief a strategy that makes overflowing waiting. The waiting is handled by exponential backoff of factor 1.5 + */ struct OverflowStrategyWait { static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait; void wait(); }; +/** + * @brief a strategy that makes overflowing overwrite the data, possibly corrupting the generated log, without waiting. + */ struct OverflowStrategyContinue { static constexpr overflow_response_t on_overflow = overflow_response_t::must_overflow; void wait(); }; +/* * ... * */ + +/** + * @brief a strategy that controls how often new log files are generated. Logs are spaced by making them writable for only a certain amount of time. + */ struct OutputStrategyTimed { std::chrono::seconds interval; - std::chrono::time_point last_change; + std::string_view directory; + std::optional > last_change = std::nullopt; std::pair chunk(std::string_view); int on_write_completed_event(std::string_view, int); }; + +/** + * @brief a strategy that controls how often new log files are generated. Logs are spaced by making them at most one line longer that the amount of bytes specified. + */ struct OutputStrategySized { uint64_t interval; - uint64_t written_bytes; + std::string_view directory; + uint64_t written_bytes = 0; std::pair chunk(std::string_view); int on_write_completed_event(std::string_view, int); }; + +/** + * @brief a strategy that makes the logs be generated in the same file all the time. + */ struct OutputStrategySimple { std::pair chunk(std::string_view); int on_write_completed_event(std::string_view, int); diff --git a/Tests/disruptor.cpp b/Tests/disruptor.cpp index 340749f..3a1db17 100644 --- a/Tests/disruptor.cpp +++ b/Tests/disruptor.cpp @@ -7,6 +7,11 @@ struct strategy { void wait() {} }; +struct strategy_that_overflows { + static constexpr overflow_response_t on_overflow = overflow_response_t::must_overflow; + void wait() {} +}; + TEST_CASE("Disruptor works sequentially") { std::array buffer{}; disruptor v{buffer.data(), buffer.size()}; @@ -136,4 +141,135 @@ TEST_CASE("Fails if buffer too small") { TEST_CASE("Fails if buffer size is 0") { REQUIRE_THROWS_AS(disruptor(nullptr, 0), disruptor_exception); +} + +TEST_CASE("Disruptor works on overflow mode if things are not too contentious", "[long][unstable]") { + std::array buffer{}; + disruptor v{buffer.data(), buffer.size()}; + + SECTION("117") { + auto W = v.reserve_write(100); + v[W.start] = 117; + v.conclude_write(W); + auto R = v.reserve_read(); + REQUIRE(v[R.start]== 117); + v.conclude_read(R); + } + + SECTION("12") { + { + auto W = v.reserve_write(6); + v[W.start] = 12; + v.conclude_write(W); + auto R = v.reserve_read(); + REQUIRE(v[R.start]== 12); + v.conclude_read(R); + } + + { + auto W = v.reserve_write(6); + v[W.start] = 8; + v.conclude_write(W); + auto R = v.reserve_read(); + REQUIRE(v[R.start]== 8); + v.conclude_read(R); + } + } + + SECTION("Disruptor loop around") { + std::multiset mset; + for(int i = 0; i != 255; i++) { + auto W = v.reserve_write(100); + v[W.start] = (char)i; + for(size_t idx = W.start; idx != W.end; idx = (idx+1)%v.size()) { + v[idx] = (char)i; + } + v.conclude_write(W); + auto R = v.reserve_read(); + for(size_t idx = R.start; idx != R.end; idx = (idx+1)%v.size()) { + mset.insert(v[idx]); + } + v.conclude_read(R); + } + for(int i = 0; i != 255; i++) { + REQUIRE(mset.count((char)i) == 100); + } + } + + SECTION("Disruptor concurrent odd vs even") { + std::atomic trigger = false; + std::multiset mset; + std::stringstream continuity; + + int acc = 0; + for(int i = 0; i<= 255; i++) { + acc+=i; + } + + using namespace std::chrono_literals; + + std::thread reader([&](){ + int cnt = 0; + auto start = std::chrono::high_resolution_clock::now(); + while (std::chrono::high_resolution_clock::now() - start < 150ms) { + auto R = v.reserve_read(); + for (size_t idx = R.start; idx != R.end; idx = (idx + 1) % v.size()) { + mset.insert(v[idx]); + continuity << (char)v[idx]; + } + v.conclude_read(R); + cnt += (R.end > R.start) * (R.end - R.start) + + (R.end < R.start) * (v.size() - R.start + R.end); + } + }); + + std::thread even([&]() { + while(!trigger.load()); + for (int i = 2; i <= 255; i += 2) { + auto W = v.reserve_write(i); + v[W.start] = (char) i; + for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { + v[idx] = (char) i; + } + v.conclude_write(W); + std::this_thread::sleep_for(50us); + } + }); + + std::thread odd([&]() { + while(!trigger.load()); + for (int i = 1; i <= 255; i += 2) { + auto W = v.reserve_write(i); + v[W.start] = (char) i; + for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { + v[idx] = (char) i; + } + v.conclude_write(W); + std::this_thread::sleep_for(50us); + } + }); + + // byte received count test + trigger.store(true); + reader.join(); + even.join(); + odd.join(); + uint16_t cnt = 0; + for(int i = 1; i <= 255; i++) { + cnt += (mset.count((char)i) == i); + } + REQUIRE(cnt >= 100); + + // Continuity tests + int changes = 0; + auto str = continuity.str(); + char current = *str.begin(); + auto it = str.begin(); + for(;it != str.end();) { + while(it != str.end() && *it == current) {++it;} + changes += 1; + current = *it; + } + REQUIRE(changes >= 100); + } } \ No newline at end of file