Parcourir la source

Added the API for C++ strategies that will then translate to C

main
Ludovic 'Archivist' Lagouardette il y a 2 ans
Parent
révision
127ecaaa67
10 fichiers modifiés avec 561 ajouts et 35 suppressions
  1. +1
    -1
      LibSnugLog/CMakeLists.txt
  2. +17
    -28
      LibSnugLog/include/disruptor.h
  3. +2
    -0
      LibSnugLog/include/registry.h
  4. +133
    -0
      LibSnugLog/public_include/sl/register.h
  5. +152
    -1
      LibSnugLog/public_include/sl/strategies.h
  6. +8
    -3
      LibSnugLog/public_include/sl/transaction.h
  7. +2
    -1
      LibSnugLog/source/disruptor.cpp
  8. +166
    -0
      LibSnugLog/source/registry.cpp
  9. +1
    -0
      Tests/CMakeLists.txt
  10. +79
    -1
      Tests/disruptor.cpp

+ 1
- 1
LibSnugLog/CMakeLists.txt Voir le fichier

@ -5,4 +5,4 @@ include_directories(./include)
add_library(LibSnugLog
include/disruptor.h
source/disruptor.cpp include/sink.h include/registry.h include/source.h public_include/sl/strategies.h public_include/sl/register.h public_include/sl/transaction.h)
source/disruptor.cpp include/sink.h include/registry.h include/source.h public_include/sl/strategies.h public_include/sl/register.h public_include/sl/transaction.h source/registry.cpp)

+ 17
- 28
LibSnugLog/include/disruptor.h Voir le fichier

@ -7,6 +7,8 @@
#include <optional>
#include <iostream>
#include "sl/strategies.h"
#ifdef __cpp_lib_hardware_interference_size
static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
static constexpr size_t line_length = std::hardware_constructive_interference_size;
@ -23,19 +25,6 @@ struct alignas(max_interference_size) padded_atomic final {
using offset_t = size_t;
extern const size_t page_size;
enum class overflow_response_t {
must_wait,
must_overflow
};
#ifdef __cpp_concepts
template<typename T>
concept OverflowStrategyType = requires (T strategy) {
{T::on_overflow} -> std::same_as<const overflow_response_t&>;
{strategy.wait()};
};
#endif
struct force_contiguous_mode {};
struct token_t {offset_t start; offset_t end;};
@ -44,9 +33,9 @@ struct disruptor_exception : public std::runtime_error {
explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
};
template<typename OverflowStrategy>
template<typename OverflowStrategyType>
#ifdef __cpp_concepts
requires OverflowStrategyType<OverflowStrategy>
requires OverflowStrategy<OverflowStrategyType>
#endif
class disruptor
{
@ -115,9 +104,9 @@ public:
goto handle_fence_end;
handle_fence:
if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_wait) {
if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
return std::nullopt;
} else if constexpr (OverflowStrategy::on_overflow == overflow_response_t::must_overflow) {
} else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_release, std::memory_order_relaxed)) {
return std::nullopt;
} else {
@ -129,8 +118,8 @@ public:
}
} else {
static_assert(
OverflowStrategy::on_overflow == overflow_response_t::must_wait
|| OverflowStrategy::on_overflow == overflow_response_t::must_overflow
OverflowStrategyType::on_overflow == overflow_response_t::must_wait
|| OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
);
}
handle_fence_end:
@ -144,7 +133,7 @@ public:
token_t reserve_write(size_t sz) {
std::optional<token_t> tok;
OverflowStrategy waiter;
OverflowStrategyType waiter;
while(true) {
tok = try_advance(write_lead, read_trailer, sz);
if(tok) break;
@ -155,7 +144,7 @@ public:
}
token_t reserve_write(size_t sz, force_contiguous_mode) {
std::optional<token_t> tok;
OverflowStrategy waiter;
OverflowStrategyType waiter;
while(true) {
tok = try_advance<true>(write_lead, read_trailer, sz);
if(tok) break;
@ -165,10 +154,10 @@ public:
return tok.value();
}
static constexpr const auto& tmp_fn = &OverflowStrategy::wait;
static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategy>) {
OverflowStrategy waiter;
void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
OverflowStrategyType waiter;
while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
@ -178,7 +167,7 @@ public:
offset_t old_offset = read_lead.load(std::memory_order_relaxed);
offset_t new_offset = write_trailer.load(std::memory_order_relaxed);
if(old_offset > new_offset) new_offset = 0;
OverflowStrategy waiter;
OverflowStrategyType waiter;
while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
waiter.wait();
@ -186,8 +175,8 @@ public:
// std::cout << old_offset << " rr " << new_offset << std::endl;
return token_t{old_offset, new_offset};
}
void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategy>) {
OverflowStrategy waiter;
void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
OverflowStrategyType waiter;
while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
waiter.wait();
}
@ -213,4 +202,4 @@ struct OverflowWait {
#endif
}
};
};

+ 2
- 0
LibSnugLog/include/registry.h Voir le fichier

@ -1 +1,3 @@
#pragma once

+ 133
- 0
LibSnugLog/public_include/sl/register.h Voir le fichier

@ -3,6 +3,10 @@
#ifdef __cplusplus
#include <cstdint>
#include <cstddef>
#include <string_view>
#include <string>
#include <functional>
#include <any>
#else
#include <stdint.h>
#include <stddef.h>
@ -11,18 +15,147 @@
#include "sl/strategies.h"
/**
* @brief represents a buffer strategy, generally as an obscure pointer
*/
struct sl_buffer_strategy;
/**
* @brief represents a sink strategy, generally as an obscure pointer
*/
struct sl_sink_strategy;
/**
* @brief represents an overflow strategy, generally as an obscure pointer
*/
struct sl_overflow_strategy;
/**
* @brief represents an output strategy, generally as an obscure pointer
*/
struct sl_output_strategy;
#ifdef __cplusplus
namespace sl {
/**
* @brief represents a c-style buffer strategy
*/
using buffer_strategy = sl_buffer_strategy;
/**
* @brief represents a c-style sink strategy
*/
using sink_strategy = sl_sink_strategy;
/**
* @brief represents a c-style overflow strategy
*/
using overflow_strategy = sl_overflow_strategy;
/**
* @brief represents a c-style output strategy
*/
using output_strategy = sl_output_strategy;
}
template<BufferStrategy b_strategy_t, SinkStrategy s_strategy_t, OverflowStrategy over_strategy_t, OutputStrategy out_strategy_t >
void register_log(std::string_view buffer_filename, uint64_t buffer_size, uint64_t out_strategy_parameter, std::string_view output_directory);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyInternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyShared, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyDirect, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyFastest, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyMmaped, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyWait, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategyTimed>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySized>(std::string_view, uint64_t, uint64_t, std::string_view);
template<> void register_log<BufferStrategyExternal, SinkStrategyExternal, OverflowStrategyContinue, OutputStrategySimple>(std::string_view, uint64_t, uint64_t, std::string_view);
#else
typedef struct sl_buffer_strategy sl_buffer_strategy;
typedef struct sl_sink_strategy sl_sink_strategy;

+ 152
- 1
LibSnugLog/public_include/sl/strategies.h Voir le fichier

@ -1,5 +1,13 @@
#pragma once
#ifdef __cplusplus
#include <vector>
#include <chrono>
#endif
/**
* @brief Describes what should happen if the allocated buffer is full and no logs can be written
*/
@ -41,4 +49,147 @@ typedef enum SL_ON_SINK_FULL SL_ON_SINK_FULL;
typedef enum SL_SINK_IO_TYPE SL_SINK_IO_TYPE;
typedef enum SL_BUFFER_TYPE SL_BUFFER_TYPE;
typedef enum SL_ROLL_LOGS SL_ROLL_LOGS;
#endif
#endif
#ifdef __cpp_concepts
/**
* @brief Represents the behaviour if the buffer would be overflowing.
*/
enum class overflow_response_t {
must_wait = 0, /**< Represents that on overflow, the writer should wait using the strategy provided wait function. */
must_overflow = 1, /**< Represents that on overflow, the writer should push the fence and keep on writing immediately, even if the log would get corrupt. */
must_drop = must_wait /**< UNUSED, may be used in the future to mean that the current write should be dropped. */
};
/**
* @brief This concept maps an overflow strategy.
*/
template<typename T>
concept OverflowStrategy = requires (T strategy) {
{T::on_overflow} -> std::same_as<const overflow_response_t&>;
{strategy.wait()};
};
/**
* @brief This concept represents something that should be writable and mapped to valid memory within the software.
*
* @details This is used for mapping allocated buffers made from buffer strategies.
*/
template<typename T>
concept BufferLike = requires (T buffer) {
{buffer.data()} -> std::same_as<char*>;
{buffer.size()} -> std::same_as<size_t>;
};
/**
* @brief This concept maps a buffer strategy.
*/
template<typename T>
concept BufferStrategy = requires (T strategy, size_t size) {
{strategy.build_buffer(size)} -> BufferLike;
};
/**
* @brief This concept maps a sink strategy.
*/
template<typename T>
concept SinkStrategy = requires (T strategy, int fd, std::string_view data) {
{strategy.write(fd, data)};
};
/**
* @brief This concept maps an output strategy.
*/
template<typename T>
concept OutputStrategy = requires (T strategy, std::string_view source, int fd) {
{strategy.chunk(source)} -> std::same_as<std::pair<std::string_view, std::string_view>>;
{strategy.on_write_completed_event(source, fd)} -> std::same_as<int>;
};
#endif
#ifdef __cplusplus
#ifndef __cpp_concepts
#define OverflowStrategy typename
#define BufferStrategy typename
#define SinkStrategy typename
#define OutputStrategy typename
#endif
/**
* @brief Represent a mapped buffer. You should generally not touch this as it is internal representation for strategies.
*/
struct mapped_buffer {
char* _data;
size_t _size;
/**
* @return the data pointer of the buffer
*/
[[nodiscard]] char* data() const { return _data; }
/**
* @return the size of the allocated buffer
*/
[[nodiscard]] size_t size() const { return _size; }
};
struct BufferStrategyInternal {
using buffer_type = std::vector<char>;
static_assert(BufferLike<buffer_type>);
buffer_type build_buffer(size_t);
};
struct BufferStrategyShared {
using buffer_type = mapped_buffer;
static_assert(BufferLike<buffer_type>);
buffer_type build_buffer(size_t);
};
struct BufferStrategyExternal {
using buffer_type = mapped_buffer;
static_assert(BufferLike<buffer_type>);
buffer_type build_buffer(size_t);
};
struct SinkStrategyDirect {
void write(int fd, std::string_view data);
};
struct SinkStrategyFastest {
void write(int fd, std::string_view data);
};
struct SinkStrategyMmaped {
void write(int fd, std::string_view data);
};
struct SinkStrategyExternal {
void write(int fd, std::string_view data);
};
struct OverflowStrategyWait {
static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait;
void wait();
};
struct OverflowStrategyContinue {
static constexpr overflow_response_t on_overflow = overflow_response_t::must_overflow;
void wait();
};
struct OutputStrategyTimed {
std::chrono::seconds interval;
std::chrono::time_point<std::chrono::file_clock> last_change;
std::pair<std::string_view, std::string_view> chunk(std::string_view);
int on_write_completed_event(std::string_view, int);
};
struct OutputStrategySized {
uint64_t interval;
uint64_t written_bytes;
std::pair<std::string_view, std::string_view> chunk(std::string_view);
int on_write_completed_event(std::string_view, int);
};
struct OutputStrategySimple {
std::pair<std::string_view, std::string_view> chunk(std::string_view);
int on_write_completed_event(std::string_view, int);
};
#endif

+ 8
- 3
LibSnugLog/public_include/sl/transaction.h Voir le fichier

@ -40,7 +40,7 @@ namespace sl {
* @post
* This class leaves the log in a valid state in accordance to the defined log strategies.
*/
class log_transaction {
class log_transaction n">final {
char* local_start;
size_t local_size;
char* buffer_start;
@ -53,7 +53,7 @@ namespace sl {
/**
* @brief A forward iterator to the current log transaction.
*/
class iterator {
class iterator n">final {
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = char;
@ -177,7 +177,12 @@ typedef struct sl_log_transaction_internals log_transaction_internals;
struct sl_log_transaction {
char* begin; /**< The start of the given writable span */
char* end; /**< 1 element past the end of the writable span */
sl_log_transaction_internals* internals; /** Internal, do not touch */
/**
* @brief Internal, do not touch
* @internal this is a pointer to a registry_slab, hence the need for reference stability/pointer stability
*/
sl_log_transaction_internals* internals;
};
#ifdef __cplusplus

+ 2
- 1
LibSnugLog/source/disruptor.cpp Voir le fichier

@ -1,3 +1,4 @@
#include "disruptor.h"
const size_t page_size = sysconf(_SC_PAGESIZE);
const size_t page_size = sysconf(_SC_PAGESIZE);

+ 166
- 0
LibSnugLog/source/registry.cpp Voir le fichier

@ -0,0 +1,166 @@
#include "registry.h"
#include "disruptor.h"
#include "sl/register.h"
#include <concepts>
#include <atomic>
#include <unordered_map>
#include <functional>
#include <any>
/**
* @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding
*/
template<std::integral T, T default_increment, T maximum_increment>
class fast_semaphore {
std::atomic<T> flag; //< This is our counter
public:
fast_semaphore() = default;
fast_semaphore(fast_semaphore&) = delete;
fast_semaphore(fast_semaphore&&) = delete;
/// 3 things may happen when trying to unlock
enum class tristate {
success = 1, //< The unlocking was successful
timing = 0, //< Someone interfered with the unlocking
error = -1 //< The unlocking would over-unlock the semaphore
};
/// We try locking until we succeed
template<T increment = default_increment>
void lock() {
while(not try_lock<increment>());
}
/// For locking, we try to atomically increment the counter while maintaining it below the set limit
template<T increment = default_increment>
[[nodiscard]] bool try_lock() {
T expect = flag.load(std::memory_order::acquire);
T target = expect + increment;
if(target > maximum_increment) return false;
return flag.compare_exchange_strong(expect,target,std::memory_order::release);
}
/// Similarly to locking, we try unlocking until we succeed (or reach an invalid state)
template<T increment = default_increment>
void unlock() {
tristate v;
do{ v = try_unlock<increment>(); }
while(v == tristate::timing);
if(v != tristate::success) {
throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue");
}
}
/// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero
template<T increment = default_increment>
[[nodiscard]] tristate try_unlock() {
T expect = flag.load(std::memory_order::relaxed);
T target = expect - increment;
if(target < 0) return tristate::error;
return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing;
}
};
using rw_lock_type = fast_semaphore<int32_t, 256, 256>;
class lock_handler_read {
rw_lock_type& ref;
explicit lock_handler_read(rw_lock_type& _ref) : ref(_ref) {
while(ref.try_lock<1>());
}
~lock_handler_read() {
while(ref.try_unlock<1>() != rw_lock_type::tristate::success);
}
};
class lock_handler_write {
rw_lock_type& ref;
explicit lock_handler_write(rw_lock_type& _ref) : ref(_ref) {
while(ref.try_lock());
}
~lock_handler_write() {
while(ref.try_unlock() != rw_lock_type::tristate::success);
}
};
static fast_semaphore<int32_t, 256, 256> registry_rw_lock;
struct registry_slab {
int id;
std::string name;
std::function<token_t(size_t)> reserve_write;
std::function<token_t(size_t)> reserve_write_c_align;
std::function<void(token_t)> conclude_write;
std::any disruptor;
};
/**
* @internal used because we need the pointer stability
* @see sl_transaction
*/
static std::unordered_map<int, registry_slab> registry_map;
BufferStrategyInternal::buffer_type BufferStrategyInternal::build_buffer(size_t) {
return {};
}
BufferStrategyShared::buffer_type BufferStrategyShared::build_buffer(size_t) {
return {};
}
BufferStrategyExternal::buffer_type BufferStrategyExternal::build_buffer(size_t) {
return {};
}
void SinkStrategyDirect::write(int fd, std::string_view data) {
}
void SinkStrategyFastest::write(int fd, std::string_view data) {
}
void SinkStrategyMmaped::write(int fd, std::string_view data) {
}
void SinkStrategyExternal::write(int fd, std::string_view data) {
}
void OverflowStrategyWait::wait() {
}
void OverflowStrategyContinue::wait() {
}
std::pair<std::string_view, std::string_view> OutputStrategyTimed::chunk(std::string_view) {
return {};
}
int OutputStrategyTimed::on_write_completed_event(std::string_view, int) {
return 0;
}
std::pair<std::string_view, std::string_view> OutputStrategySized::chunk(std::string_view) {
return {};
}
int OutputStrategySized::on_write_completed_event(std::string_view, int) {
return 0;
}
std::pair<std::string_view, std::string_view> OutputStrategySimple::chunk(std::string_view) {
return {};
}
int OutputStrategySimple::on_write_completed_event(std::string_view, int) {
return 0;
}

+ 1
- 0
Tests/CMakeLists.txt Voir le fichier

@ -11,6 +11,7 @@ FetchContent_MakeAvailable(Catch2)
add_executable(tests disruptor.cpp)
target_link_libraries(tests PRIVATE LibSnugLog)
target_link_libraries(tests PRIVATE Catch2::Catch2WithMain)
include_directories(../LibSnugLog/public_include)
include(CTest)
include(Catch)

+ 79
- 1
Tests/disruptor.cpp Voir le fichier

@ -1,3 +1,4 @@
#include <thread>
#include "catch2/catch_all.hpp"
#include "../LibSnugLog/include/disruptor.h"
@ -49,7 +50,7 @@ TEST_CASE("Disruptor works sequentially") {
}
v.conclude_write(W);
auto R = v.reserve_read();
for(size_t idx = W.start; idx != W.end; idx = (idx+1)%v.size()) {
for(size_t idx = R.start; idx != R.end; idx = (idx+1)%v.size()) {
mset.insert(v[idx]);
}
v.conclude_read(R);
@ -58,4 +59,81 @@ TEST_CASE("Disruptor works sequentially") {
REQUIRE(mset.count((char)i) == 100);
}
}
SECTION("Disruptor concurrent odd vs even") {
std::atomic<bool> trigger = false;
std::multiset<char> mset;
std::stringstream continuity;
int acc = 0;
for(int i = 0; i<= 255; i++) {
acc+=i;
}
std::thread reader([&](){
int cnt = 0;
while (cnt != acc) {
auto R = v.reserve_read();
for (size_t idx = R.start; idx != R.end; idx = (idx + 1) % v.size()) {
mset.insert(v[idx]);
continuity << (char)v[idx];
}
v.conclude_read(R);
cnt += (R.end > R.start) * (R.end - R.start)
+ (R.end < R.start) * (v.size() - R.start + R.end);
}
});
std::thread even([&]() {
while(!trigger.load());
for (int i = 2; i <= 255; i += 2) {
auto W = v.reserve_write(i);
v[W.start] = (char) i;
for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) {
v[idx] = (char) i;
}
v.conclude_write(W);
}
});
std::thread odd([&]() {
while(!trigger.load());
for (int i = 1; i <= 255; i += 2) {
auto W = v.reserve_write(i);
v[W.start] = (char) i;
for (size_t idx = W.start; idx != W.end; idx = (idx + 1) % v.size()) {
v[idx] = (char) i;
}
v.conclude_write(W);
}
});
// byte received count test
trigger.store(true);
reader.join(); even.join(); odd.join();
for(int i = 1; i <= 255; i++) {
REQUIRE(mset.count((char)i) == i);
}
// Continuity tests
int changes = 0;
auto str = continuity.str();
char current = *str.begin();
auto it = str.begin();
for(;it != str.end();) {
while(it != str.end() && *it == current) {++it;}
changes += 1;
current = *it;
}
REQUIRE(changes == 255);
}
}
TEST_CASE("Fails if buffer too small") {
REQUIRE_THROWS_AS(disruptor<OverflowWait>(nullptr, page_size), disruptor_exception);
}
TEST_CASE("Fails if buffer size is 0") {
REQUIRE_THROWS_AS(disruptor<OverflowWait>(nullptr, 0), disruptor_exception);
}

Chargement…
Annuler
Enregistrer