A C++ library for logging very fast and without allocating.
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

258 rader
7.8 KiB

  1. #include "registry.h"
  2. #include "disruptor.h"
  3. #include "sl/register.h"
  4. #include <concepts>
  5. #include <atomic>
  6. #include <unordered_map>
  7. #include <functional>
  8. #include <any>
  9. #include "nlohmann/json.hpp"
  10. #if defined(__linux__) || defined(__linux)
  11. #include <sys/mman.h>
  12. #include <fcntl.h>
  13. #include <memory>
  14. #include <thread>
  15. #include <future>
  16. #include <fstream>
  17. #endif
  18. /**
  19. * @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding
  20. */
  21. template<std::integral T, T default_increment, T maximum_increment>
  22. class fast_semaphore {
  23. std::atomic<T> flag; //< This is our counter
  24. public:
  25. fast_semaphore() = default;
  26. fast_semaphore(fast_semaphore&) = delete;
  27. fast_semaphore(fast_semaphore&&) = delete;
  28. /// 3 things may happen when trying to unlock
  29. enum class tristate {
  30. success = 1, //< The unlocking was successful
  31. timing = 0, //< Someone interfered with the unlocking
  32. error = -1 //< The unlocking would over-unlock the semaphore
  33. };
  34. /// We try locking until we succeed
  35. template<T increment = default_increment>
  36. void lock() {
  37. while(not try_lock<increment>());
  38. }
  39. /// For locking, we try to atomically increment the counter while maintaining it below the set limit
  40. template<T increment = default_increment>
  41. [[nodiscard]] bool try_lock() {
  42. T expect = flag.load(std::memory_order::acquire);
  43. T target = expect + increment;
  44. if(target > maximum_increment) return false;
  45. return flag.compare_exchange_strong(expect,target,std::memory_order::release);
  46. }
  47. /// Similarly to locking, we try unlocking until we succeed (or reach an invalid state)
  48. template<T increment = default_increment>
  49. void unlock() {
  50. tristate v;
  51. do{ v = try_unlock<increment>(); }
  52. while(v == tristate::timing);
  53. if(v != tristate::success) {
  54. throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue");
  55. }
  56. }
  57. /// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero
  58. template<T increment = default_increment>
  59. [[nodiscard]] tristate try_unlock() {
  60. T expect = flag.load(std::memory_order::relaxed);
  61. T target = expect - increment;
  62. if(target < 0) return tristate::error;
  63. return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing;
  64. }
  65. };
  66. using rw_lock_type = fast_semaphore<int32_t, 256, 256>;
  67. class lock_handler_read {
  68. rw_lock_type& ref;
  69. public:
  70. explicit lock_handler_read(rw_lock_type& _ref) : ref(_ref) {
  71. while(ref.try_lock<1>());
  72. }
  73. ~lock_handler_read() {
  74. while(ref.try_unlock<1>() != rw_lock_type::tristate::success);
  75. }
  76. };
  77. class lock_handler_write {
  78. rw_lock_type& ref;
  79. public:
  80. explicit lock_handler_write(rw_lock_type& _ref) : ref(_ref) {
  81. while(ref.try_lock());
  82. }
  83. ~lock_handler_write() {
  84. while(ref.try_unlock() != rw_lock_type::tristate::success);
  85. }
  86. };
  87. static fast_semaphore<int32_t, 256, 256> registry_rw_lock;
  88. std::unordered_map<int, registry_slab> registry_map;
  89. template<BufferStrategy buff_strat, SinkStrategy sink_strat, OverflowStrategy of_strat, OutputStrategy out_strat>
  90. void register_log_impl(int log_id, std::string_view log_name, std::string_view filename, uint64_t buffer_size, uint64_t out_strategy_parameter, std::string_view output_directory) {
  91. if(buffer_size % page_size != 0) {
  92. // TODO: more meaningful exception
  93. throw disruptor_exception{"Size provided for the disruptor doesn't align with page size"};
  94. }
  95. lock_handler_write lock_me{registry_rw_lock};
  96. if(registry_map.contains(log_id)) {
  97. // TODO: more meaningful exception
  98. throw disruptor_exception{"Log ID already exists"};
  99. }
  100. registry_map.insert(std::pair<int, registry_slab>{log_id, {}});
  101. auto& slab = registry_map[log_id];
  102. slab.id = log_id;
  103. slab.name = log_name;
  104. auto* transient_reference = new disruptor<of_strat>(buff_strat{}, filename, buffer_size);
  105. slab.disruptor = transient_reference;
  106. slab.reserve_write = [ptr = transient_reference] (size_t sz) -> token_t{
  107. return ptr->reserve_write(sz);
  108. };
  109. slab.reserve_write_c_align = [ptr = transient_reference] (size_t sz) -> token_t{
  110. return ptr->reserve_write(sz, force_contiguous_mode);
  111. };
  112. slab.conclude_write = [ptr = transient_reference] (token_t tok) -> void {
  113. ptr->conclude_write(tok);
  114. };
  115. slab.get_buffer = [self = slab, ptr = transient_reference] (token_t tok) -> write_span {
  116. return write_span(tok, *ptr);
  117. };
  118. }
  119. BufferStrategyInternal::buffer_type BufferStrategyInternal::build_buffer(std::string_view, size_t sz) {
  120. return std::vector<char>(sz, 0);
  121. }
  122. BufferStrategyShared::buffer_type BufferStrategyShared::build_buffer(std::string_view, size_t sz) {
  123. #if defined(__linux__) || defined(__linux)
  124. auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE, -1, 0);
  125. if(not buff) throw disruptor_exception{"Could not allocate memory for BufferStrategyShared"};
  126. return {(char*)buff, sz};
  127. #else
  128. static_assert(false, "BufferStrategyShared strategy is unimplemented on your system");
  129. #endif
  130. }
  131. BufferStrategyExternal::buffer_type BufferStrategyExternal::build_buffer(std::string_view filename, size_t sz) {
  132. #if defined(__linux__) || defined(__linux)
  133. auto fd = open(std::string{filename}.c_str(), O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
  134. if(fd <= 0) throw disruptor_exception{"Could not open or create file for BufferStrategyExternal"};
  135. if(ftruncate(fd, sz) != 0) throw disruptor_exception{"Could not ensure size for the file for BufferStrategyExternal"};
  136. auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
  137. if(not buff) {
  138. throw disruptor_exception{"Could not allocate memory for BufferStrategyExternal, mapping failed"};
  139. }
  140. return {(char*)buff, sz};
  141. #else
  142. static_assert(false, "BufferStrategyExternal strategy is unimplemented on your system");
  143. #endif
  144. }
  145. void SinkStrategyDirect::write(int fd, std::string_view data) {
  146. do{
  147. auto written = ::write(fd, data.data(), data.size());
  148. if(written < 0) {
  149. throw std::runtime_error("POSIX write failed");
  150. } else if(written == 0) {
  151. throw std::runtime_error("POSIX write made no progress");
  152. }
  153. data.substr(written);
  154. } while(not data.empty());
  155. }
  156. void SinkStrategyFastest::write(int fd, std::string_view data) {
  157. }
  158. void SinkStrategyMmaped::write(int fd, std::string_view data) {
  159. }
  160. void SinkStrategyExternal::write(int fd, std::string_view data) {
  161. }
  162. void OverflowStrategyWait::wait() {
  163. std::this_thread::yield();
  164. }
  165. void OverflowStrategyContinue::wait() {
  166. }
  167. std::pair<std::string_view, std::string_view> OutputStrategyTimed::chunk(std::string_view) {
  168. return {};
  169. }
  170. int OutputStrategyTimed::on_write_completed_event(std::string_view, int) {
  171. return 0;
  172. }
  173. std::pair<std::string_view, std::string_view> OutputStrategySized::chunk(std::string_view) {
  174. return {};
  175. }
  176. int OutputStrategySized::on_write_completed_event(std::string_view, int) {
  177. return 0;
  178. }
  179. std::pair<std::string_view, std::string_view> OutputStrategySimple::chunk(std::string_view) {
  180. return {};
  181. }
  182. int OutputStrategySimple::on_write_completed_event(std::string_view, int) {
  183. return 0;
  184. }
  185. static nlohmann::json type_check_log_config(nlohmann::json&& data) {
  186. if(not data.is_object()) return {};
  187. if(not data.contains("on_overflow") and not data["on_overflow"].is_string()) return {};
  188. if(not data.contains("buffer_type")) return {};
  189. if(auto& type = data.at("buffer_type"); not (type.is_string() and type.get<std::string>() == "internal") and not data.contains("buffer_filename")) return {};
  190. }
  191. static void process_one(const nlohmann::json& data) {
  192. }
  193. void sl::process_log_config(const std::string_view &filename) {
  194. std::ifstream config{std::string(filename)};
  195. nlohmann::json data;
  196. config >> data;
  197. if(data.is_array()) {
  198. for(auto& elem : data) {
  199. elem = type_check_log_config(std::move(elem));
  200. if(elem.empty()) {
  201. // TODO: handle error
  202. }
  203. process_one(elem);
  204. }
  205. } else if (data.is_object()) {
  206. data = type_check_log_config(std::move(data));
  207. if(data.empty()) {
  208. // TODO: handle error
  209. }
  210. process_one(data);
  211. } else {
  212. // TODO: handle error
  213. }
  214. }