A C++ library for logging very fast and without allocating.
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

277 rader
8.6 KiB

  1. #pragma once
  2. #include <cstddef>
  3. #include <atomic>
  4. #include <new>
  5. #include <cassert>
  6. #include <optional>
  7. #include <iostream>
  8. #include <any>
  9. #include "sl/strategies.h"
  10. #if not defined(FORCE_HW_INTERFERENCE)
  11. #define FORCE_HW_INTERFERENCE 64
  12. #endif
  13. #if not defined(FORCE_LINE_LENGTH)
  14. #define FORCE_LINE_LENGTH 128
  15. #endif
  16. static constexpr size_t max_interference_size = FORCE_HW_INTERFERENCE;
  17. static constexpr size_t line_length = FORCE_LINE_LENGTH;
  18. template<typename T>
  19. struct alignas(max_interference_size) padded_atomic final {
  20. std::atomic<T> value;
  21. };
  22. using offset_t = size_t;
  23. extern const size_t page_size;
  24. struct force_contiguous_mode_t {};
  25. constexpr force_contiguous_mode_t force_contiguous_mode;
  26. struct token_t {offset_t start; offset_t end;};
  27. struct disruptor_exception : public std::runtime_error {
  28. disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
  29. explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
  30. };
  31. template<typename OverflowStrategyType>
  32. #ifdef __cpp_concepts
  33. requires OverflowStrategy<OverflowStrategyType>
  34. #endif
  35. class disruptor
  36. {
  37. char* buffer;
  38. size_t buffer_size;
  39. std::atomic<offset_t>* read_trailer;
  40. std::atomic<offset_t>* read_lead;
  41. std::atomic<offset_t>* write_trailer;
  42. std::atomic<offset_t>* write_lead;
  43. offset_t* max_offset;
  44. char* data_buffer;
  45. std::any buffer_life_extender;
  46. std::any strategy_life_extender;
  47. public:
  48. /**
  49. * Constructs a disruptor from the provided strategy, initialization_and_checks() is responsible for the initialization
  50. * @param strategy The provided strategy. The provided strategy is preserved, intact if properly movable and if moving is "pointer stable". The obtained buffer is also preserved under the same supposition.
  51. * @param _buffer_size The total size of the buffer to allocate, keep in mind this includes not only the
  52. */
  53. template<BufferStrategy buffer_strategy>
  54. disruptor(buffer_strategy strategy, std::string_view _buffer_filename, const size_t _buffer_size)
  55. : read_trailer{}
  56. , read_lead{}
  57. , write_trailer{}
  58. , write_lead{}
  59. {
  60. auto tmp = strategy.build_buffer(_buffer_filename, _buffer_size);
  61. buffer = tmp.data();
  62. buffer_size = tmp.size();
  63. read_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*0);
  64. read_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*1);
  65. write_trailer = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*2);
  66. write_lead = reinterpret_cast<std::atomic<offset_t>*>(buffer+max_interference_size*3);
  67. max_offset = reinterpret_cast<offset_t*> (buffer+max_interference_size*4);
  68. buffer_life_extender = std::move(tmp);
  69. strategy_life_extender = std::move(strategy);
  70. initialization_and_checks();
  71. }
  72. /**
  73. * Constructs a disruptor from the provided memory span, initialization_and_checks() is responsible for the initialization
  74. * @ref initialization_and_checks()
  75. * @param _buffer
  76. * @param _buffer_size
  77. */
  78. disruptor(char* _buffer, const size_t _buffer_size)
  79. : buffer(_buffer)
  80. , buffer_size(_buffer_size)
  81. , read_trailer (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
  82. , read_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
  83. , write_trailer(reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
  84. , write_lead (reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
  85. , max_offset (reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
  86. {
  87. initialization_and_checks();
  88. }
  89. void initialization_and_checks() {
  90. if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
  91. data_buffer = buffer+max_interference_size*5;
  92. if(data_buffer <= buffer+page_size) {
  93. data_buffer = buffer+page_size;
  94. *max_offset = buffer_size - page_size;
  95. } else if(data_buffer > buffer+page_size) {
  96. size_t ctr = 0;
  97. do {
  98. ctr++;
  99. } while(data_buffer > buffer+page_size*ctr);
  100. data_buffer = buffer+page_size*ctr;
  101. *max_offset = buffer_size - page_size*ctr;
  102. }
  103. }
  104. template<bool must_be_contiguous = false>
  105. std::optional<token_t> try_advance(
  106. std::atomic<offset_t>& lead,
  107. std::atomic<offset_t>& fence,
  108. offset_t amount
  109. ) {
  110. offset_t old_offset = lead.load(std::memory_order_seq_cst);
  111. offset_t new_offset = old_offset + amount;
  112. offset_t fence_v = fence.load(std::memory_order_seq_cst);
  113. // Check if we jumped the fence
  114. if(fence_v <= new_offset && fence_v > old_offset) {
  115. goto handle_fence;
  116. }
  117. // Check if we jumped the fence while overflowing
  118. if(new_offset >= *max_offset) {
  119. if constexpr(must_be_contiguous) {
  120. new_offset = amount;
  121. } else {
  122. new_offset %= *max_offset;
  123. }
  124. if(fence_v <= new_offset) {
  125. goto handle_fence;
  126. }
  127. }
  128. goto handle_fence_end;
  129. handle_fence:
  130. if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
  131. return std::nullopt;
  132. } else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
  133. if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_seq_cst)) {
  134. return std::nullopt;
  135. } else {
  136. offset_t v;
  137. do {
  138. v = read_lead->load(std::memory_order_seq_cst);
  139. if(v >= new_offset) break;
  140. } while(read_lead->compare_exchange_weak(v, new_offset, std::memory_order_seq_cst));
  141. }
  142. } else {
  143. static_assert(
  144. OverflowStrategyType::on_overflow == overflow_response_t::must_wait
  145. || OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
  146. );
  147. }
  148. handle_fence_end:
  149. if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
  150. return std::nullopt;
  151. }
  152. return token_t{old_offset, new_offset};
  153. }
  154. token_t reserve_write(size_t sz) {
  155. std::optional<token_t> tok;
  156. OverflowStrategyType waiter;
  157. while(true) {
  158. tok = try_advance(*write_lead, *read_trailer, sz);
  159. if(tok) break;
  160. waiter.wait();
  161. }
  162. // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
  163. return tok.value();
  164. }
  165. token_t reserve_write(size_t sz, force_contiguous_mode_t) {
  166. std::optional<token_t> tok;
  167. OverflowStrategyType waiter;
  168. while(true) {
  169. tok = try_advance<true>(*write_lead, *read_trailer, sz);
  170. if(tok) break;
  171. waiter.wait();
  172. }
  173. // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
  174. return tok.value();
  175. }
  176. static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
  177. void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
  178. OverflowStrategyType waiter;
  179. while(!write_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
  180. waiter.wait();
  181. }
  182. }
  183. token_t reserve_read() {
  184. offset_t old_offset = read_lead->load(std::memory_order_seq_cst);
  185. offset_t new_offset = write_trailer->load(std::memory_order_seq_cst);
  186. if(old_offset > new_offset) new_offset = 0;
  187. OverflowStrategyType waiter;
  188. while(!read_lead->compare_exchange_weak(old_offset, new_offset, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
  189. waiter.wait();
  190. }
  191. // std::cout << old_offset << " rr " << new_offset << std::endl;
  192. return token_t{old_offset, new_offset};
  193. }
  194. void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
  195. OverflowStrategyType waiter;
  196. while(!read_trailer->compare_exchange_weak(tok.start, tok.end, std::memory_order_seq_cst, std::memory_order_seq_cst)) {
  197. waiter.wait();
  198. }
  199. // std::cout << tok.start << " cr " << tok.end << std::endl;
  200. }
  201. char& operator[](size_t offset) {
  202. return data_buffer[offset];
  203. }
  204. char* data() {
  205. return data_buffer;
  206. }
  207. [[nodiscard]] offset_t size() const {
  208. return *max_offset;
  209. }
  210. };
  211. class write_span {
  212. offset_t start, end;
  213. size_t target_sz;
  214. char* target_buffer;
  215. public:
  216. template<typename of_strat>
  217. write_span(token_t tok, disruptor<of_strat>& _target)
  218. : start(tok.start)
  219. , end(tok.end)
  220. , target_sz(_target.size())
  221. , target_buffer(_target.data())
  222. {}
  223. private:
  224. write_span(const write_span& src, size_t ltrim)
  225. : start((src.start + ltrim) % src.target_sz)
  226. , end(src.end)
  227. , target_sz(src.target_sz)
  228. , target_buffer(src.target_buffer)
  229. {}
  230. public:
  231. char& front() {
  232. return target_buffer[start];
  233. }
  234. [[nodiscard]] size_t size() const {
  235. return start <= end ? end - start : end + target_sz - start;
  236. }
  237. write_span subspan(size_t ltrim) {
  238. if(ltrim > size()) throw disruptor_exception("write_span overflow, ltrim greater than available span");
  239. return write_span(*this, ltrim);
  240. }
  241. [[nodiscard]] bool empty() const {
  242. return end==start;
  243. }
  244. };