A C++ library for logging very fast and without allocating.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

258 lignes
7.8 KiB

  1. #include "registry.h"
  2. #include "disruptor.h"
  3. #include "sl/register.h"
  4. #include <concepts>
  5. #include <atomic>
  6. #include <unordered_map>
  7. #include <functional>
  8. #include <any>
  9. #include "nlohmann/json.hpp"
  10. #if defined(__linux__) || defined(__linux)
  11. #include <sys/mman.h>
  12. #include <fcntl.h>
  13. #include <memory>
  14. #include <thread>
  15. #include <future>
  16. #include <fstream>
  17. #endif
  18. /**
  19. * @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding
  20. */
  21. template<std::integral T, T default_increment, T maximum_increment>
  22. class fast_semaphore {
  23. std::atomic<T> flag; //< This is our counter
  24. public:
  25. fast_semaphore() = default;
  26. fast_semaphore(fast_semaphore&) = delete;
  27. fast_semaphore(fast_semaphore&&) = delete;
  28. /// 3 things may happen when trying to unlock
  29. enum class tristate {
  30. success = 1, //< The unlocking was successful
  31. timing = 0, //< Someone interfered with the unlocking
  32. error = -1 //< The unlocking would over-unlock the semaphore
  33. };
  34. /// We try locking until we succeed
  35. template<T increment = default_increment>
  36. void lock() {
  37. while(not try_lock<increment>());
  38. }
  39. /// For locking, we try to atomically increment the counter while maintaining it below the set limit
  40. template<T increment = default_increment>
  41. [[nodiscard]] bool try_lock() {
  42. T expect = flag.load(std::memory_order::acquire);
  43. T target = expect + increment;
  44. if(target > maximum_increment) return false;
  45. return flag.compare_exchange_strong(expect,target,std::memory_order::release);
  46. }
  47. /// Similarly to locking, we try unlocking until we succeed (or reach an invalid state)
  48. template<T increment = default_increment>
  49. void unlock() {
  50. tristate v;
  51. do{ v = try_unlock<increment>(); }
  52. while(v == tristate::timing);
  53. if(v != tristate::success) {
  54. throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue");
  55. }
  56. }
  57. /// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero
  58. template<T increment = default_increment>
  59. [[nodiscard]] tristate try_unlock() {
  60. T expect = flag.load(std::memory_order::relaxed);
  61. T target = expect - increment;
  62. if(target < 0) return tristate::error;
  63. return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing;
  64. }
  65. };
  66. using rw_lock_type = fast_semaphore<int32_t, 256, 256>;
  67. class lock_handler_read {
  68. rw_lock_type& ref;
  69. public:
  70. explicit lock_handler_read(rw_lock_type& _ref) : ref(_ref) {
  71. while(ref.try_lock<1>());
  72. }
  73. ~lock_handler_read() {
  74. while(ref.try_unlock<1>() != rw_lock_type::tristate::success);
  75. }
  76. };
  77. class lock_handler_write {
  78. rw_lock_type& ref;
  79. public:
  80. explicit lock_handler_write(rw_lock_type& _ref) : ref(_ref) {
  81. while(ref.try_lock());
  82. }
  83. ~lock_handler_write() {
  84. while(ref.try_unlock() != rw_lock_type::tristate::success);
  85. }
  86. };
  87. static fast_semaphore<int32_t, 256, 256> registry_rw_lock;
  88. std::unordered_map<int, registry_slab> registry_map;
  89. template<BufferStrategy buff_strat, SinkStrategy sink_strat, OverflowStrategy of_strat, OutputStrategy out_strat>
  90. void register_log_impl(int log_id, std::string_view log_name, std::string_view filename, uint64_t buffer_size, uint64_t out_strategy_parameter, std::string_view output_directory) {
  91. if(buffer_size % page_size != 0) {
  92. // TODO: more meaningful exception
  93. throw disruptor_exception{"Size provided for the disruptor doesn't align with page size"};
  94. }
  95. lock_handler_write lock_me{registry_rw_lock};
  96. if(registry_map.contains(log_id)) {
  97. // TODO: more meaningful exception
  98. throw disruptor_exception{"Log ID already exists"};
  99. }
  100. registry_map.insert(std::pair<int, registry_slab>{log_id, {}});
  101. auto& slab = registry_map[log_id];
  102. slab.id = log_id;
  103. slab.name = log_name;
  104. auto* transient_reference = new disruptor<of_strat>(buff_strat{}, filename, buffer_size);
  105. slab.disruptor = transient_reference;
  106. slab.reserve_write = [ptr = transient_reference] (size_t sz) -> token_t{
  107. return ptr->reserve_write(sz);
  108. };
  109. slab.reserve_write_c_align = [ptr = transient_reference] (size_t sz) -> token_t{
  110. return ptr->reserve_write(sz, force_contiguous_mode);
  111. };
  112. slab.conclude_write = [ptr = transient_reference] (token_t tok) -> void {
  113. ptr->conclude_write(tok);
  114. };
  115. slab.get_buffer = [self = slab, ptr = transient_reference] (token_t tok) -> write_span {
  116. return write_span(tok, *ptr);
  117. };
  118. }
  119. BufferStrategyInternal::buffer_type BufferStrategyInternal::build_buffer(std::string_view, size_t sz) {
  120. return std::vector<char>(sz, 0);
  121. }
  122. BufferStrategyShared::buffer_type BufferStrategyShared::build_buffer(std::string_view, size_t sz) {
  123. #if defined(__linux__) || defined(__linux)
  124. auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE, -1, 0);
  125. if(not buff) throw disruptor_exception{"Could not allocate memory for BufferStrategyShared"};
  126. return {(char*)buff, sz};
  127. #else
  128. static_assert(false, "BufferStrategyShared strategy is unimplemented on your system");
  129. #endif
  130. }
  131. BufferStrategyExternal::buffer_type BufferStrategyExternal::build_buffer(std::string_view filename, size_t sz) {
  132. #if defined(__linux__) || defined(__linux)
  133. auto fd = open(std::string{filename}.c_str(), O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
  134. if(fd <= 0) throw disruptor_exception{"Could not open or create file for BufferStrategyExternal"};
  135. if(ftruncate(fd, sz) != 0) throw disruptor_exception{"Could not ensure size for the file for BufferStrategyExternal"};
  136. auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
  137. if(not buff) {
  138. throw disruptor_exception{"Could not allocate memory for BufferStrategyExternal, mapping failed"};
  139. }
  140. return {(char*)buff, sz};
  141. #else
  142. static_assert(false, "BufferStrategyExternal strategy is unimplemented on your system");
  143. #endif
  144. }
  145. void SinkStrategyDirect::write(int fd, std::string_view data) {
  146. do{
  147. auto written = ::write(fd, data.data(), data.size());
  148. if(written < 0) {
  149. throw std::runtime_error("POSIX write failed");
  150. } else if(written == 0) {
  151. throw std::runtime_error("POSIX write made no progress");
  152. }
  153. data.substr(written);
  154. } while(not data.empty());
  155. }
  156. void SinkStrategyFastest::write(int fd, std::string_view data) {
  157. }
  158. void SinkStrategyMmaped::write(int fd, std::string_view data) {
  159. }
  160. void SinkStrategyExternal::write(int fd, std::string_view data) {
  161. }
  162. void OverflowStrategyWait::wait() {
  163. std::this_thread::yield();
  164. }
  165. void OverflowStrategyContinue::wait() {
  166. }
  167. std::pair<std::string_view, std::string_view> OutputStrategyTimed::chunk(std::string_view) {
  168. return {};
  169. }
  170. int OutputStrategyTimed::on_write_completed_event(std::string_view, int) {
  171. return 0;
  172. }
  173. std::pair<std::string_view, std::string_view> OutputStrategySized::chunk(std::string_view) {
  174. return {};
  175. }
  176. int OutputStrategySized::on_write_completed_event(std::string_view, int) {
  177. return 0;
  178. }
  179. std::pair<std::string_view, std::string_view> OutputStrategySimple::chunk(std::string_view) {
  180. return {};
  181. }
  182. int OutputStrategySimple::on_write_completed_event(std::string_view, int) {
  183. return 0;
  184. }
  185. static nlohmann::json type_check_log_config(nlohmann::json&& data) {
  186. if(not data.is_object()) return {};
  187. if(not data.contains("on_overflow") and not data["on_overflow"].is_string()) return {};
  188. if(not data.contains("buffer_type")) return {};
  189. if(auto& type = data.at("buffer_type"); not (type.is_string() and type.get<std::string>() == "internal") and not data.contains("buffer_filename")) return {};
  190. }
  191. static void process_one(const nlohmann::json& data) {
  192. }
  193. void sl::process_log_config(const std::string_view &filename) {
  194. std::ifstream config{std::string(filename)};
  195. nlohmann::json data;
  196. config >> data;
  197. if(data.is_array()) {
  198. for(auto& elem : data) {
  199. elem = type_check_log_config(std::move(elem));
  200. if(elem.empty()) {
  201. // TODO: handle error
  202. }
  203. process_one(elem);
  204. }
  205. } else if (data.is_object()) {
  206. data = type_check_log_config(std::move(data));
  207. if(data.empty()) {
  208. // TODO: handle error
  209. }
  210. process_one(data);
  211. } else {
  212. // TODO: handle error
  213. }
  214. }