#include #include "catch2/catch_all.hpp" #include "../LibSnugLog/include/disruptor.h" struct strategy { static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait; void wait() {} }; struct strategy_that_overflows { static constexpr overflow_response_t on_overflow = overflow_response_t::must_overflow; void wait() {} }; TEST_CASE("Disruptor works sequentially") { std::array buffer{}; disruptor v{buffer.data(), buffer.size()}; SECTION("117") { auto W = v.reserve_write(100); v[W.start] = 117; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 117); v.conclude_read(R); } SECTION("12") { { auto W = v.reserve_write(6); v[W.start] = 12; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 12); v.conclude_read(R); } { auto W = v.reserve_write(6); v[W.start] = 8; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 8); v.conclude_read(R); } } SECTION("Disruptor loop around") { std::multiset mset; for(int i = 0; i != 255; i++) { auto W = v.reserve_write(100); v[W.start] = (char)i; for(size_t idx = W.start; idx != W.end; idx = (idx+1)%v.size()) { v[idx] = (char)i; } v.conclude_write(W); auto R = v.reserve_read(); for(size_t idx = R.start; idx != R.end; idx = (idx+1)%v.size()) { mset.insert(v[idx]); } v.conclude_read(R); } for(int i = 0; i != 255; i++) { REQUIRE(mset.count((char)i) == 100); } } SECTION("Disruptor concurrent odd vs even") { std::atomic trigger = false; std::multiset mset; std::stringstream continuity; int acc = 0; for(int i = 0; i<= 255; i++) { acc+=i; } std::thread reader([&](){ int cnt = 0; while (cnt != acc) { auto R = v.reserve_read(); for (size_t idx = R.start; idx != R.end; idx = (idx + 1) % v.size()) { mset.insert(v[idx]); continuity << (char)v[idx]; } v.conclude_read(R); cnt += (R.end > R.start) * (R.end - R.start) + (R.end < R.start) * (v.size() - R.start + R.end); } }); std::thread even([&]() { while(!trigger.load()); for (int i = 2; i <= 255; i += 2) { auto W = v.reserve_write(i); v[W.start] = (char) i; for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { v[idx] = (char) i; } v.conclude_write(W); } }); std::thread odd([&]() { while(!trigger.load()); for (int i = 1; i <= 255; i += 2) { auto W = v.reserve_write(i); v[W.start] = (char) i; for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { v[idx] = (char) i; } v.conclude_write(W); } }); // byte received count test trigger.store(true); reader.join(); even.join(); odd.join(); for(int i = 1; i <= 255; i++) { REQUIRE(mset.count((char)i) == i); } // Continuity tests int changes = 0; auto str = continuity.str(); char current = *str.begin(); auto it = str.begin(); for(;it != str.end();) { while(it != str.end() && *it == current) {++it;} changes += 1; current = *it; } REQUIRE(changes == 255); } } TEST_CASE("Fails if buffer too small") { REQUIRE_THROWS_AS(disruptor(nullptr, page_size), disruptor_exception); } TEST_CASE("Fails if buffer size is 0") { REQUIRE_THROWS_AS(disruptor(nullptr, 0), disruptor_exception); } TEST_CASE("Fails if buffer too small") { REQUIRE_THROWS_AS(disruptor(nullptr, page_size), disruptor_exception); } TEST_CASE("Fails if buffer size is 0") { REQUIRE_THROWS_AS(disruptor(nullptr, 0), disruptor_exception); } TEST_CASE("Disruptor works on overflow mode if things are not too contentious", "[long][unstable]") { std::array buffer{}; disruptor v{buffer.data(), buffer.size()}; SECTION("117") { auto W = v.reserve_write(100); v[W.start] = 117; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 117); v.conclude_read(R); } SECTION("12") { { auto W = v.reserve_write(6); v[W.start] = 12; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 12); v.conclude_read(R); } { auto W = v.reserve_write(6); v[W.start] = 8; v.conclude_write(W); auto R = v.reserve_read(); REQUIRE(v[R.start]== 8); v.conclude_read(R); } } SECTION("Disruptor loop around") { std::multiset mset; for(int i = 0; i != 255; i++) { auto W = v.reserve_write(100); v[W.start] = (char)i; for(size_t idx = W.start; idx != W.end; idx = (idx+1)%v.size()) { v[idx] = (char)i; } v.conclude_write(W); auto R = v.reserve_read(); for(size_t idx = R.start; idx != R.end; idx = (idx+1)%v.size()) { mset.insert(v[idx]); } v.conclude_read(R); } for(int i = 0; i != 255; i++) { REQUIRE(mset.count((char)i) == 100); } } SECTION("Disruptor concurrent odd vs even") { std::atomic trigger = false; std::multiset mset; std::stringstream continuity; int acc = 0; for(int i = 0; i<= 255; i++) { acc+=i; } using namespace std::chrono_literals; std::thread reader([&](){ int cnt = 0; auto start = std::chrono::high_resolution_clock::now(); while (std::chrono::high_resolution_clock::now() - start < 150ms) { auto R = v.reserve_read(); for (size_t idx = R.start; idx != R.end; idx = (idx + 1) % v.size()) { mset.insert(v[idx]); continuity << (char)v[idx]; } v.conclude_read(R); cnt += (R.end > R.start) * (R.end - R.start) + (R.end < R.start) * (v.size() - R.start + R.end); } }); std::thread even([&]() { while(!trigger.load()); for (int i = 2; i <= 255; i += 2) { auto W = v.reserve_write(i); v[W.start] = (char) i; for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { v[idx] = (char) i; } v.conclude_write(W); std::this_thread::sleep_for(50us); } }); std::thread odd([&]() { while(!trigger.load()); for (int i = 1; i <= 255; i += 2) { auto W = v.reserve_write(i); v[W.start] = (char) i; for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) { v[idx] = (char) i; } v.conclude_write(W); std::this_thread::sleep_for(50us); } }); // byte received count test trigger.store(true); reader.join(); even.join(); odd.join(); uint16_t cnt = 0; for(int i = 1; i <= 255; i++) { cnt += (mset.count((char)i) == i); } REQUIRE(cnt >= 100); // Continuity tests int changes = 0; auto str = continuity.str(); char current = *str.begin(); auto it = str.begin(); for(;it != str.end();) { while(it != str.end() && *it == current) {++it;} changes += 1; current = *it; } REQUIRE(changes >= 100); } }