#include "registry.h" #include "disruptor.h" #include "sl/register.h" #include #include #include #include #include #include "nlohmann/json.hpp" #if defined(__linux__) || defined(__linux) #include #include #include #include #include #include #endif /** * @brief A fast semaphore exclusion handler WITHOUT deadlock detection or yielding */ template class fast_semaphore { std::atomic flag; //< This is our counter public: fast_semaphore() = default; fast_semaphore(fast_semaphore&) = delete; fast_semaphore(fast_semaphore&&) = delete; /// 3 things may happen when trying to unlock enum class tristate { success = 1, //< The unlocking was successful timing = 0, //< Someone interfered with the unlocking error = -1 //< The unlocking would over-unlock the semaphore }; /// We try locking until we succeed template void lock() { while(not try_lock()); } /// For locking, we try to atomically increment the counter while maintaining it below the set limit template [[nodiscard]] bool try_lock() { T expect = flag.load(std::memory_order::acquire); T target = expect + increment; if(target > maximum_increment) return false; return flag.compare_exchange_strong(expect,target,std::memory_order::release); } /// Similarly to locking, we try unlocking until we succeed (or reach an invalid state) template void unlock() { tristate v; do{ v = try_unlock(); } while(v == tristate::timing); if(v != tristate::success) { throw std::runtime_error("Over unlocking may have happened: potential double unlocking issue"); } } /// Unlocking is the reverse of locking, we have to ensure to return an error if we try to go below zero template [[nodiscard]] tristate try_unlock() { T expect = flag.load(std::memory_order::relaxed); T target = expect - increment; if(target < 0) return tristate::error; return flag.compare_exchange_strong(expect,target,std::memory_order::release) ? tristate::success : tristate::timing; } }; using rw_lock_type = fast_semaphore; class lock_handler_read { rw_lock_type& ref; public: explicit lock_handler_read(rw_lock_type& _ref) : ref(_ref) { while(ref.try_lock<1>()); } ~lock_handler_read() { while(ref.try_unlock<1>() != rw_lock_type::tristate::success); } }; class lock_handler_write { rw_lock_type& ref; public: explicit lock_handler_write(rw_lock_type& _ref) : ref(_ref) { while(ref.try_lock()); } ~lock_handler_write() { while(ref.try_unlock() != rw_lock_type::tristate::success); } }; static fast_semaphore registry_rw_lock; std::unordered_map registry_map; template void register_log_impl(int log_id, std::string_view log_name, std::string_view filename, uint64_t buffer_size, uint64_t out_strategy_parameter, std::string_view output_directory) { if(buffer_size % page_size != 0) { // TODO: more meaningful exception throw disruptor_exception{"Size provided for the disruptor doesn't align with page size"}; } lock_handler_write lock_me{registry_rw_lock}; if(registry_map.contains(log_id)) { // TODO: more meaningful exception throw disruptor_exception{"Log ID already exists"}; } registry_map.insert(std::pair{log_id, {}}); auto& slab = registry_map[log_id]; slab.id = log_id; slab.name = log_name; auto* transient_reference = new disruptor(buff_strat{}, filename, buffer_size); slab.disruptor = transient_reference; slab.reserve_write = [ptr = transient_reference] (size_t sz) -> token_t{ return ptr->reserve_write(sz); }; slab.reserve_write_c_align = [ptr = transient_reference] (size_t sz) -> token_t{ return ptr->reserve_write(sz, force_contiguous_mode); }; slab.conclude_write = [ptr = transient_reference] (token_t tok) -> void { ptr->conclude_write(tok); }; slab.get_buffer = [self = slab, ptr = transient_reference] (token_t tok) -> write_span { return write_span(tok, *ptr); }; } BufferStrategyInternal::buffer_type BufferStrategyInternal::build_buffer(std::string_view, size_t sz) { return std::vector(sz, 0); } BufferStrategyShared::buffer_type BufferStrategyShared::build_buffer(std::string_view, size_t sz) { #if defined(__linux__) || defined(__linux) auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); if(not buff) throw disruptor_exception{"Could not allocate memory for BufferStrategyShared"}; return {(char*)buff, sz}; #else static_assert(false, "BufferStrategyShared strategy is unimplemented on your system"); #endif } BufferStrategyExternal::buffer_type BufferStrategyExternal::build_buffer(std::string_view filename, size_t sz) { #if defined(__linux__) || defined(__linux) auto fd = open(std::string{filename}.c_str(), O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR); if(fd <= 0) throw disruptor_exception{"Could not open or create file for BufferStrategyExternal"}; if(ftruncate(fd, sz) != 0) throw disruptor_exception{"Could not ensure size for the file for BufferStrategyExternal"}; auto buff = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0); if(not buff) { throw disruptor_exception{"Could not allocate memory for BufferStrategyExternal, mapping failed"}; } return {(char*)buff, sz}; #else static_assert(false, "BufferStrategyExternal strategy is unimplemented on your system"); #endif } void SinkStrategyDirect::write(int fd, std::string_view data) { do{ auto written = ::write(fd, data.data(), data.size()); if(written < 0) { throw std::runtime_error("POSIX write failed"); } else if(written == 0) { throw std::runtime_error("POSIX write made no progress"); } data.substr(written); } while(not data.empty()); } void SinkStrategyFastest::write(int fd, std::string_view data) { } void SinkStrategyMmaped::write(int fd, std::string_view data) { } void SinkStrategyExternal::write(int fd, std::string_view data) { } void OverflowStrategyWait::wait() { std::this_thread::yield(); } void OverflowStrategyContinue::wait() { } std::pair OutputStrategyTimed::chunk(std::string_view) { return {}; } int OutputStrategyTimed::on_write_completed_event(std::string_view, int) { return 0; } std::pair OutputStrategySized::chunk(std::string_view) { return {}; } int OutputStrategySized::on_write_completed_event(std::string_view, int) { return 0; } std::pair OutputStrategySimple::chunk(std::string_view) { return {}; } int OutputStrategySimple::on_write_completed_event(std::string_view, int) { return 0; } static nlohmann::json type_check_log_config(nlohmann::json&& data) { if(not data.is_object()) return {}; if(not data.contains("on_overflow") and not data["on_overflow"].is_string()) return {}; if(not data.contains("buffer_type")) return {}; if(auto& type = data.at("buffer_type"); not (type.is_string() and type.get() == "internal") and not data.contains("buffer_filename")) return {}; } static void process_one(const nlohmann::json& data) { } void sl::process_log_config(const std::string_view &filename) { std::ifstream config{std::string(filename)}; nlohmann::json data; config >> data; if(data.is_array()) { for(auto& elem : data) { elem = type_check_log_config(std::move(elem)); if(elem.empty()) { // TODO: handle error } process_one(elem); } } else if (data.is_object()) { data = type_check_log_config(std::move(data)); if(data.empty()) { // TODO: handle error } process_one(data); } else { // TODO: handle error } }