A C++ library for logging very fast and without allocating.
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

205 linhas
6.3 KiB

  1. #pragma once
  2. #include <cstddef>
  3. #include <atomic>
  4. #include <new>
  5. #include <cassert>
  6. #include <optional>
  7. #include <iostream>
  8. #include "sl/strategies.h"
  9. #ifdef __cpp_lib_hardware_interference_size
  10. static constexpr size_t max_interference_size = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
  11. static constexpr size_t line_length = std::hardware_constructive_interference_size;
  12. #else
  13. static constexpr size_t max_interference_size = 128;
  14. static constexpr size_t line_length = 64;
  15. #endif
  16. template<typename T>
  17. struct alignas(max_interference_size) padded_atomic final {
  18. std::atomic<T> value;
  19. };
  20. using offset_t = size_t;
  21. extern const size_t page_size;
  22. struct force_contiguous_mode {};
  23. struct token_t {offset_t start; offset_t end;};
  24. struct disruptor_exception : public std::runtime_error {
  25. disruptor_exception() : std::runtime_error("Unknown error disruptor") {}
  26. explicit disruptor_exception(const char* str) : std::runtime_error(str) {}
  27. };
  28. template<typename OverflowStrategyType>
  29. #ifdef __cpp_concepts
  30. requires OverflowStrategy<OverflowStrategyType>
  31. #endif
  32. class disruptor
  33. {
  34. char* buffer;
  35. size_t buffer_size;
  36. std::atomic<offset_t>& read_trailer;
  37. std::atomic<offset_t>& read_lead;
  38. std::atomic<offset_t>& write_trailer;
  39. std::atomic<offset_t>& write_lead;
  40. offset_t& max_offset;
  41. char* data_buffer;
  42. public:
  43. disruptor(char* _buffer, const size_t _buffer_size)
  44. : buffer(_buffer)
  45. , buffer_size(_buffer_size)
  46. , read_trailer (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*0))
  47. , read_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*1))
  48. , write_trailer(*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*2))
  49. , write_lead (*reinterpret_cast<std::atomic<offset_t>*>(_buffer+max_interference_size*3))
  50. , max_offset (*reinterpret_cast<offset_t*> (_buffer+max_interference_size*4))
  51. {
  52. if(buffer_size <= page_size) throw disruptor_exception("buffer size too small to build a disruptor");
  53. data_buffer = buffer+max_interference_size*5;
  54. if(data_buffer <= buffer+page_size) {
  55. data_buffer = buffer+page_size;
  56. max_offset = buffer_size - page_size;
  57. } else if(data_buffer > buffer+page_size) {
  58. size_t ctr = 0;
  59. do {
  60. ctr++;
  61. } while(data_buffer > buffer+page_size*ctr);
  62. data_buffer = buffer+page_size*ctr;
  63. max_offset = buffer_size - page_size*ctr;
  64. }
  65. }
  66. template<bool must_be_contiguous = false>
  67. std::optional<token_t> try_advance(
  68. std::atomic<offset_t>& lead,
  69. std::atomic<offset_t>& fence,
  70. offset_t amount
  71. ) {
  72. offset_t old_offset = lead.load(std::memory_order_relaxed);
  73. offset_t new_offset = old_offset + amount;
  74. offset_t fence_v = fence.load(std::memory_order_relaxed);
  75. // Check if we jumped the fence
  76. if(fence_v <= new_offset && fence_v > old_offset) {
  77. goto handle_fence;
  78. }
  79. // Check if we jumped the fence while overflowing
  80. if(new_offset >= max_offset) {
  81. if constexpr(must_be_contiguous) {
  82. new_offset = amount;
  83. } else {
  84. new_offset %= max_offset;
  85. }
  86. if(fence_v <= new_offset) {
  87. goto handle_fence;
  88. }
  89. }
  90. goto handle_fence_end;
  91. handle_fence:
  92. if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_wait) {
  93. return std::nullopt;
  94. } else if constexpr (OverflowStrategyType::on_overflow == overflow_response_t::must_overflow) {
  95. if(!fence.compare_exchange_weak(fence_v, new_offset, std::memory_order_release, std::memory_order_relaxed)) {
  96. return std::nullopt;
  97. } else {
  98. offset_t v;
  99. do {
  100. v = read_lead.load(std::memory_order_acquire);
  101. if(v >= new_offset) break;
  102. } while(read_lead.compare_exchange_weak(v, new_offset, std::memory_order_release, std::memory_order_relaxed));
  103. }
  104. } else {
  105. static_assert(
  106. OverflowStrategyType::on_overflow == overflow_response_t::must_wait
  107. || OverflowStrategyType::on_overflow == overflow_response_t::must_overflow
  108. );
  109. }
  110. handle_fence_end:
  111. if(!lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
  112. return std::nullopt;
  113. }
  114. return token_t{old_offset, new_offset};
  115. }
  116. token_t reserve_write(size_t sz) {
  117. std::optional<token_t> tok;
  118. OverflowStrategyType waiter;
  119. while(true) {
  120. tok = try_advance(write_lead, read_trailer, sz);
  121. if(tok) break;
  122. waiter.wait();
  123. }
  124. // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
  125. return tok.value();
  126. }
  127. token_t reserve_write(size_t sz, force_contiguous_mode) {
  128. std::optional<token_t> tok;
  129. OverflowStrategyType waiter;
  130. while(true) {
  131. tok = try_advance<true>(write_lead, read_trailer, sz);
  132. if(tok) break;
  133. waiter.wait();
  134. }
  135. // std::cout << tok.value().start << " rw " << tok.value().end << std::endl;
  136. return tok.value();
  137. }
  138. static constexpr const auto& tmp_fn = &OverflowStrategyType::wait;
  139. void conclude_write(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
  140. OverflowStrategyType waiter;
  141. while(!write_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
  142. waiter.wait();
  143. }
  144. }
  145. token_t reserve_read() {
  146. offset_t old_offset = read_lead.load(std::memory_order_relaxed);
  147. offset_t new_offset = write_trailer.load(std::memory_order_relaxed);
  148. if(old_offset > new_offset) new_offset = 0;
  149. OverflowStrategyType waiter;
  150. while(!read_lead.compare_exchange_weak(old_offset, new_offset, std::memory_order_acquire, std::memory_order_relaxed)) {
  151. waiter.wait();
  152. }
  153. // std::cout << old_offset << " rr " << new_offset << std::endl;
  154. return token_t{old_offset, new_offset};
  155. }
  156. void conclude_read(token_t tok) noexcept(std::is_nothrow_invocable_v<decltype(tmp_fn), OverflowStrategyType>) {
  157. OverflowStrategyType waiter;
  158. while(!read_trailer.compare_exchange_weak(tok.start, tok.end, std::memory_order_release, std::memory_order_relaxed)) {
  159. waiter.wait();
  160. }
  161. // std::cout << tok.start << " cr " << tok.end << std::endl;
  162. }
  163. char& operator[](size_t offset) {
  164. return data_buffer[offset];
  165. }
  166. [[nodiscard]] offset_t size() const {
  167. return max_offset;
  168. }
  169. };
  170. struct OverflowWait {
  171. static constexpr overflow_response_t on_overflow = overflow_response_t::must_wait;
  172. void wait() {
  173. #if defined(__clang__) || defined(__GNUC__)
  174. __asm__("nop\n\t");
  175. #elif _MSC_VER
  176. __nop;
  177. #endif
  178. }
  179. };