diff --git a/Makefile b/Makefile index 04a4b99..42b73d6 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,7 @@ $(OBJ_DIR)/%.o: %.cpp $(TARGET): $(OBJECTS) $(TJS_OBJECTS) build make -C CommandEr @mkdir -p $(@D) - $(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/$(TARGETNAME) src/$(TARGET) $(OBJECTS) $(TJS_OBJECTS) + $(CXX) $(CXXFLAGS) -pthread $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/$(TARGETNAME) src/$(TARGET) $(OBJECTS) $(TJS_OBJECTS) $(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/test_client src/test_client.cpp $(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/db_stats src/db_stats.cpp diff --git a/include/database.hpp b/include/database.hpp index a3a79ca..c1c1e8f 100644 --- a/include/database.hpp +++ b/include/database.hpp @@ -39,6 +39,21 @@ public: , delete_table{delete_cnt, deletions} {} + static void mark_empty_page(db_page& gojp, size_t idx) + { + auto& gojmark = *(std::array, db_page_size/sizeof(uint32_t)>*)&gojp; + bitops::regulated mark = 0; + std::array mark_array = {'G', 'o', 'J', '@'}; + mark.internal = *(uint32_t*)&mark_array; + uint32_t slice = static_cast(idx); + + for(size_t mpos = 0; mpos < gojmark.size(); mpos+=2) + { + gojmark[mpos] = mark; + gojmark[mpos+1] = slice++; + } + } + static database&& create(const std::string dir, size_t page_nb) { database ret{ dir+"records", @@ -50,16 +65,31 @@ public: page_nb/8 }; + std::cout << "Record table: "; for(auto& n : ret.records) { n.second.timestamp = 0; n.second.offset = 0; + n.second.flags = 0; } + std::cout << "DONE" << std::endl; + std::cout << "Delete table: "; for(auto& n : ret.delete_table) { n = std::numeric_limits::max(); } + std::cout << "DONE" << std::endl; + + for(size_t idx = 0; idx < ret.pages.size(); idx++) + { + mark_empty_page(ret.pages[idx], idx); + if(idx % 120000 == 0) + { + std::cout << "Page markings: " << uint16_t(100.0f*float(idx)/float(ret.pages.size())) << "%" << std::endl; + } + } + std::cout << "Page markings: DONE" << std::endl; (*ret.metadata).last_page = 0; (*ret.metadata).last_delete = 0; @@ -217,9 +247,10 @@ public: value.timestamp = std::numeric_limits::max(); (*metadata).last_delete+=1; delete_table[(*metadata).last_delete-1] = value.offset; + mark_empty_page(pages[value.offset], value.offset); value.offset = 0; - break; - } + } + break; } hashed_roll++; diff --git a/include/fsized_map.h b/include/fsized_map.h index c0437aa..824f6cc 100644 --- a/include/fsized_map.h +++ b/include/fsized_map.h @@ -63,7 +63,8 @@ struct [[gnu::packed]] record{ bitops::regulated flags = 0; }; -using db_page = std::array; +constexpr size_t db_page_size = 16384; +using db_page = std::array; template struct mmap_ptr{ @@ -170,6 +171,10 @@ class mmap_array : public mmap_ptr return item_size; } + int fd_v() const { + return fd; + } + constexpr mmap_ptr begin() const { mmap_ptr ret = *this; return ret; diff --git a/include/network.hpp b/include/network.hpp index eb2d280..cc480af 100644 --- a/include/network.hpp +++ b/include/network.hpp @@ -13,7 +13,8 @@ enum class db_op : uint32_t { swrite = 6, sallocate = 7, sremove = 3, - confirm = 8 + confirm = 8, + bulk_write = 9 }; struct [[gnu::packed]] received_data { @@ -23,6 +24,13 @@ struct [[gnu::packed]] received_data { db_page page = {0}; }; +struct [[gnu::packed]] received_bulk_data { + bitops::regulated op = db_op::version; + bitops::regulated rep_id = 0; + std::array identifiers; + std::array page; +}; + struct [[gnu::packed]] sending_data { bitops::regulated rep_id = 0; record identifier = record{}; @@ -36,4 +44,9 @@ struct [[gnu::packed]] stats_data { bitops::regulated total_records; bitops::regulated total_delete; bitops::regulated cow_full; + bitops::regulated free_records; + bitops::regulated sync_rate_µs; + bitops::regulated sync_duration_µs; + bitops::regulated max_sync_duration_µs; + bitops::regulated avg_sync_duration_µs; }; \ No newline at end of file diff --git a/src/db_stats.cpp b/src/db_stats.cpp index b3ad6e4..754eb80 100644 --- a/src/db_stats.cpp +++ b/src/db_stats.cpp @@ -58,13 +58,20 @@ int main(int argc, char** argv) std::cout << "free_pages\t" << stats->free << std::endl; std::cout << "deleted_pages\t" << stats->free_deleted << std::endl; std::cout << "total_pages\t" << stats->total_pages << std::endl; + std::cout << "usable_records\t" << stats->free_records << std::endl; std::cout << "total_records\t" << stats->total_records << std::endl; std::cout << "delete_cache\t" << stats->total_delete << std::endl; std::cout << "inaccessible\t" << stats->cow_full << std::endl; + std::cout << "sync_rate\t" << stats->sync_rate_µs << "µs (" << stats->sync_rate_µs/1000000.0f << "s)" << std::endl; + std::cout << "sync_durr\t" << stats->sync_duration_µs << "µs (" << stats->sync_duration_µs/1000000.0f << "s)" << std::endl; + std::cout << "max_sync_durr\t" << stats->max_sync_duration_µs << "µs (" << stats->max_sync_duration_µs/1000000.0f << "s)" << std::endl; + std::cout << "avg_sync_durr\t" << stats->avg_sync_duration_µs << "µs (" << stats->avg_sync_duration_µs/1000000.0f << "s)" << std::endl; return 0; } - }catch(...) {} + }catch(...) { + exit(13); + } std::cerr << "Invalid command, expects ``db_stats IPV4 PORT''" << std::endl; return 1; } \ No newline at end of file diff --git a/src/izaro-storage.cpp b/src/izaro-storage.cpp index d9a7507..af106fa 100644 --- a/src/izaro-storage.cpp +++ b/src/izaro-storage.cpp @@ -12,17 +12,77 @@ #include #include #include +#include +#include #include #include "commander.hpp" +constexpr auto record_sync_freq = std::chrono::microseconds(500000); +std::atomic last_cycle_ts; +std::atomic last_cycle_durr; +std::atomic max_cycle_durr; +std::atomic avg_cycle_durr; +std::atomic will_stop{false}; + +void sync_records_thread(const database& db) +{ + std::chrono::microseconds total{0}; + uint16_t count = 0; + do{ + auto begin = std::chrono::high_resolution_clock::now(); + fdatasync(db.records.fd_v()); + auto durr = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin); + total += durr; + count++; + auto avg = total/count; + if(count>20) + { + total -= avg; + count--; + } + avg_cycle_durr.store(avg); + last_cycle_durr.store(durr); + if(max_cycle_durr.load() < durr) + { + max_cycle_durr.store(durr); + } + if(durr +void buffer_cleanup(received_bulk_data& recv_impl) { + constexpr size_t total = ((size_t)mode)/(((size_t)mode % 8 == 0) ? 8 : 4); + + for( + size_t i = 0; + i < total; + i++ + ) + { + if constexpr ((size_t)mode % 8 == 0) + { + ((uint64_t*)&recv_impl)[i] = 0; + } else { + ((uint32_t*)&recv_impl)[i] = 0; + } + } +} + int main( [[maybe_unused]] int argc, [[maybe_unused]] char** argv ) { - - CMD::commander cmd_args(argc, argv); std::string database_str = "/data/"; @@ -179,17 +239,20 @@ int main( addr.sin_port = htons(db_port); memset((void*)&addr.sin_addr, 0, sizeof(addr.sin_addr)); bind(soc,(struct sockaddr*)&addr,sizeof(addr)); + std::thread syncher(sync_records_thread, run_db); + received_bulk_data recv_impl; + buffer_cleanup(recv_impl); do{ - received_data recv; + received_data& recv = *(received_data*)&recv_impl; sending_data reply; sockaddr_in client; socklen_t packet_sz; size_t reply_size; recvfrom( soc, - (void*)&recv, - sizeof(received_data), + (void*)&recv_impl, + sizeof(received_bulk_data), MSG_WAITFORONE, (struct sockaddr*)&client, &packet_sz @@ -203,6 +266,7 @@ int main( reply.rep_id = recv.rep_id; reply.identifier.record_head.split.x = 1; reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier.record_head.split.x); + buffer_cleanup(recv_impl); break; case db_op::read: { @@ -218,6 +282,7 @@ int main( { reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); } + buffer_cleanup(recv_impl); } break; case db_op::write: @@ -230,6 +295,7 @@ int main( std::cerr << "cluster overfull"<< std::endl; } reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); + buffer_cleanup(recv_impl); } break; // case db_op::sremove: @@ -239,6 +305,7 @@ int main( run_db.remove(recv.identifier); reply.identifier.record_head.split = recv.identifier; reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); + buffer_cleanup(recv_impl); } break; case db_op::stats: @@ -253,15 +320,25 @@ int main( + (*run_db.metadata).page_cnt - (*run_db.metadata).last_page; uint64_t cow_full = 0; + uint64_t usable_records = 0; for(auto& elem : run_db.records) { - if(elem.second.timestamp == std::numeric_limits::max()) + if (elem.second.timestamp == std::numeric_limits::max()) { ++cow_full; + ++usable_records; + } else if(elem.second.timestamp == 0) { + ++usable_records; } } stats->cow_full = cow_full; + stats->free_records = usable_records; + stats->sync_rate_µs = record_sync_freq.count(); + stats->sync_duration_µs = last_cycle_durr.load().count(); + stats->max_sync_duration_µs = max_cycle_durr.load().count(); + stats->avg_sync_duration_µs = avg_cycle_durr.load().count(); reply_size = sizeof(reply); + buffer_cleanup(recv_impl); } break; case db_op::sallocate: @@ -269,6 +346,7 @@ int main( reply.rep_id = recv.rep_id; reply.identifier = run_db.try_allocate(recv.identifier); reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); + buffer_cleanup(recv_impl); } break; case db_op::sread: @@ -285,6 +363,7 @@ int main( { reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); } + buffer_cleanup(recv_impl); } break; case db_op::swrite: @@ -297,6 +376,7 @@ int main( std::cerr << "cluster overfull"<< std::endl; } reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier); + buffer_cleanup(recv_impl); } break; case db_op::confirm: @@ -304,6 +384,7 @@ int main( reply.rep_id = recv.rep_id; run_db.confirm(recv.identifier, *(bitops::regulated*)&recv.page); reply_size = sizeof(reply.rep_id); + buffer_cleanup(recv_impl); } break; default: diff --git a/src/test_client.cpp b/src/test_client.cpp index 5d37373..94e181e 100644 --- a/src/test_client.cpp +++ b/src/test_client.cpp @@ -8,7 +8,11 @@ int main(int, char** argv) { { std::random_device hrng{}; - std::minstd_rand srng{hrng()}; + auto seed = hrng(); + + std::minstd_rand srng{seed}; + + std::minstd_rand cpyrng{seed}; auto soc = socket(AF_INET, SOCK_DGRAM, 0); struct sockaddr_in server; @@ -53,7 +57,35 @@ int main(int, char** argv) { auto durr = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin)/tot; - std::cerr << "durr/elem = " << durr.count() << "ns/op" << std::endl; + std::cerr << "w:durr/elem = " << durr.count() << "ns/op" << std::endl; + std::cerr << "w:elem/s = " << 1000000000.0f/durr.count() << std::endl; + + begin = std::chrono::high_resolution_clock::now(); + + for(size_t idx=0;idx(std::chrono::high_resolution_clock::now() - begin)/tot; + + std::cerr << "r:durr/elem = " << durr.count() << "ns/op" << std::endl; + std::cerr << "r:elem/s = " << 1000000000.0f/durr.count() << std::endl; request.op = db_op::read; sendto(