Explorar el Código

Improvements of netcode, added more statistics

master
Ludovic 'Archivist' Lagouardette hace 5 años
padre
commit
713dc83fe4
Se han modificado 7 ficheros con 183 adiciones y 14 borrados
  1. +1
    -1
      Makefile
  2. +33
    -2
      include/database.hpp
  3. +6
    -1
      include/fsized_map.h
  4. +14
    -1
      include/network.hpp
  5. +8
    -1
      src/db_stats.cpp
  6. +87
    -6
      src/izaro-storage.cpp
  7. +34
    -2
      src/test_client.cpp

+ 1
- 1
Makefile Ver fichero

@ -58,7 +58,7 @@ $(OBJ_DIR)/%.o: %.cpp
$(TARGET): $(OBJECTS) $(TJS_OBJECTS) build
make -C CommandEr
@mkdir -p $(@D)
$(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/$(TARGETNAME) src/$(TARGET) $(OBJECTS) $(TJS_OBJECTS)
$(CXX) $(CXXFLAGS) -pthread $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/$(TARGETNAME) src/$(TARGET) $(OBJECTS) $(TJS_OBJECTS)
$(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/test_client src/test_client.cpp
$(CXX) $(CXXFLAGS) $(INCLUDE) $(LDFLAGS) -o $(APP_DIR)/db_stats src/db_stats.cpp

+ 33
- 2
include/database.hpp Ver fichero

@ -39,6 +39,21 @@ public:
, delete_table{delete_cnt, deletions}
{}
static void mark_empty_page(db_page& gojp, size_t idx)
{
auto& gojmark = *(std::array<bitops::regulated<uint32_t>, db_page_size/sizeof(uint32_t)>*)&gojp;
bitops::regulated<uint32_t> mark = 0;
std::array<char,4> mark_array = {'G', 'o', 'J', '@'};
mark.internal = *(uint32_t*)&mark_array;
uint32_t slice = static_cast<uint32_t>(idx);
for(size_t mpos = 0; mpos < gojmark.size(); mpos+=2)
{
gojmark[mpos] = mark;
gojmark[mpos+1] = slice++;
}
}
static database&& create(const std::string dir, size_t page_nb) {
database ret{
dir+"records",
@ -50,16 +65,31 @@ public:
page_nb/8
};
std::cout << "Record table: ";
for(auto& n : ret.records)
{
n.second.timestamp = 0;
n.second.offset = 0;
n.second.flags = 0;
}
std::cout << "DONE" << std::endl;
std::cout << "Delete table: ";
for(auto& n : ret.delete_table)
{
n = std::numeric_limits<size_t>::max();
}
std::cout << "DONE" << std::endl;
for(size_t idx = 0; idx < ret.pages.size(); idx++)
{
mark_empty_page(ret.pages[idx], idx);
if(idx % 120000 == 0)
{
std::cout << "Page markings: " << uint16_t(100.0f*float(idx)/float(ret.pages.size())) << "%" << std::endl;
}
}
std::cout << "Page markings: DONE" << std::endl;
(*ret.metadata).last_page = 0;
(*ret.metadata).last_delete = 0;
@ -217,9 +247,10 @@ public:
value.timestamp = std::numeric_limits<uint64_t>::max();
(*metadata).last_delete+=1;
delete_table[(*metadata).last_delete-1] = value.offset;
mark_empty_page(pages[value.offset], value.offset);
value.offset = 0;
break;
}
}
break;
}
hashed_roll++;

+ 6
- 1
include/fsized_map.h Ver fichero

@ -63,7 +63,8 @@ struct [[gnu::packed]] record{
bitops::regulated<uint32_t> flags = 0;
};
using db_page = std::array<uint8_t, 16384>;
constexpr size_t db_page_size = 16384;
using db_page = std::array<uint8_t, db_page_size>;
template<typename T>
struct mmap_ptr{
@ -170,6 +171,10 @@ class mmap_array : public mmap_ptr
return item_size;
}
int fd_v() const {
return fd;
}
constexpr mmap_ptr<T> begin() const {
mmap_ptr<T> ret = *this;
return ret;

+ 14
- 1
include/network.hpp Ver fichero

@ -13,7 +13,8 @@ enum class db_op : uint32_t {
swrite = 6,
sallocate = 7,
sremove = 3,
confirm = 8
confirm = 8,
bulk_write = 9
};
struct [[gnu::packed]] received_data {
@ -23,6 +24,13 @@ struct [[gnu::packed]] received_data {
db_page page = {0};
};
struct [[gnu::packed]] received_bulk_data {
bitops::regulated<db_op> op = db_op::version;
bitops::regulated<uint64_t> rep_id = 0;
std::array<record_identifier, 4096/sizeof(record_identifier)> identifiers;
std::array<db_page, 4096/sizeof(record_identifier)> page;
};
struct [[gnu::packed]] sending_data {
bitops::regulated<uint64_t> rep_id = 0;
record identifier = record{};
@ -36,4 +44,9 @@ struct [[gnu::packed]] stats_data {
bitops::regulated<uint64_t> total_records;
bitops::regulated<uint64_t> total_delete;
bitops::regulated<uint64_t> cow_full;
bitops::regulated<uint64_t> free_records;
bitops::regulated<uint64_t> sync_rate_µs;
bitops::regulated<uint64_t> sync_duration_µs;
bitops::regulated<uint64_t> max_sync_duration_µs;
bitops::regulated<uint64_t> avg_sync_duration_µs;
};

+ 8
- 1
src/db_stats.cpp Ver fichero

@ -58,13 +58,20 @@ int main(int argc, char** argv)
std::cout << "free_pages\t" << stats->free << std::endl;
std::cout << "deleted_pages\t" << stats->free_deleted << std::endl;
std::cout << "total_pages\t" << stats->total_pages << std::endl;
std::cout << "usable_records\t" << stats->free_records << std::endl;
std::cout << "total_records\t" << stats->total_records << std::endl;
std::cout << "delete_cache\t" << stats->total_delete << std::endl;
std::cout << "inaccessible\t" << stats->cow_full << std::endl;
std::cout << "sync_rate\t" << stats->sync_rate_µs << "µs (" << stats->sync_rate_µs/1000000.0f << "s)" << std::endl;
std::cout << "sync_durr\t" << stats->sync_duration_µs << "µs (" << stats->sync_duration_µs/1000000.0f << "s)" << std::endl;
std::cout << "max_sync_durr\t" << stats->max_sync_duration_µs << "µs (" << stats->max_sync_duration_µs/1000000.0f << "s)" << std::endl;
std::cout << "avg_sync_durr\t" << stats->avg_sync_duration_µs << "µs (" << stats->avg_sync_duration_µs/1000000.0f << "s)" << std::endl;
return 0;
}
}catch(...) {}
}catch(...) {
exit(13);
}
std::cerr << "Invalid command, expects ``db_stats IPV4 PORT''" << std::endl;
return 1;
}

+ 87
- 6
src/izaro-storage.cpp Ver fichero

@ -12,17 +12,77 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <atomic>
#include <thread>
#include <stdio.h>
#include "commander.hpp"
constexpr auto record_sync_freq = std::chrono::microseconds(500000);
std::atomic<std::chrono::microseconds> last_cycle_ts;
std::atomic<std::chrono::microseconds> last_cycle_durr;
std::atomic<std::chrono::microseconds> max_cycle_durr;
std::atomic<std::chrono::microseconds> avg_cycle_durr;
std::atomic<bool> will_stop{false};
void sync_records_thread(const database& db)
{
std::chrono::microseconds total{0};
uint16_t count = 0;
do{
auto begin = std::chrono::high_resolution_clock::now();
fdatasync(db.records.fd_v());
auto durr = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - begin);
total += durr;
count++;
auto avg = total/count;
if(count>20)
{
total -= avg;
count--;
}
avg_cycle_durr.store(avg);
last_cycle_durr.store(durr);
if(max_cycle_durr.load() < durr)
{
max_cycle_durr.store(durr);
}
if(durr<record_sync_freq)
{
std::this_thread::sleep_for(record_sync_freq - durr);
}
}while(!will_stop);
}
enum class cleanup_type : size_t {
normal = sizeof(received_data),
complete = sizeof(received_bulk_data)
};
template<cleanup_type mode>
void buffer_cleanup(received_bulk_data& recv_impl) {
constexpr size_t total = ((size_t)mode)/(((size_t)mode % 8 == 0) ? 8 : 4);
for(
size_t i = 0;
i < total;
i++
)
{
if constexpr ((size_t)mode % 8 == 0)
{
((uint64_t*)&recv_impl)[i] = 0;
} else {
((uint32_t*)&recv_impl)[i] = 0;
}
}
}
int main(
[[maybe_unused]] int argc,
[[maybe_unused]] char** argv
)
{
CMD::commander cmd_args(argc, argv);
std::string database_str = "/data/";
@ -179,17 +239,20 @@ int main(
addr.sin_port = htons(db_port);
memset((void*)&addr.sin_addr, 0, sizeof(addr.sin_addr));
bind(soc,(struct sockaddr*)&addr,sizeof(addr));
std::thread syncher(sync_records_thread, run_db);
received_bulk_data recv_impl;
buffer_cleanup<cleanup_type::complete>(recv_impl);
do{
received_data recv;
received_data& recv = *(received_data*)&recv_impl;
sending_data reply;
sockaddr_in client;
socklen_t packet_sz;
size_t reply_size;
recvfrom(
soc,
(void*)&recv,
sizeof(received_data),
(void*)&recv_impl,
sizeof(received_bulk_data),
MSG_WAITFORONE,
(struct sockaddr*)&client,
&packet_sz
@ -203,6 +266,7 @@ int main(
reply.rep_id = recv.rep_id;
reply.identifier.record_head.split.x = 1;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier.record_head.split.x);
buffer_cleanup<cleanup_type::normal>(recv_impl);
break;
case db_op::read:
{
@ -218,6 +282,7 @@ int main(
{
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::write:
@ -230,6 +295,7 @@ int main(
std::cerr << "cluster overfull"<< std::endl;
}
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
// case db_op::sremove:
@ -239,6 +305,7 @@ int main(
run_db.remove(recv.identifier);
reply.identifier.record_head.split = recv.identifier;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::stats:
@ -253,15 +320,25 @@ int main(
+ (*run_db.metadata).page_cnt
- (*run_db.metadata).last_page;
uint64_t cow_full = 0;
uint64_t usable_records = 0;
for(auto& elem : run_db.records)
{
if(elem.second.timestamp == std::numeric_limits<uint64_t>::max())
if (elem.second.timestamp == std::numeric_limits<uint64_t>::max())
{
++cow_full;
++usable_records;
} else if(elem.second.timestamp == 0) {
++usable_records;
}
}
stats->cow_full = cow_full;
stats->free_records = usable_records;
stats->sync_rate_µs = record_sync_freq.count();
stats->sync_duration_µs = last_cycle_durr.load().count();
stats->max_sync_duration_µs = max_cycle_durr.load().count();
stats->avg_sync_duration_µs = avg_cycle_durr.load().count();
reply_size = sizeof(reply);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::sallocate:
@ -269,6 +346,7 @@ int main(
reply.rep_id = recv.rep_id;
reply.identifier = run_db.try_allocate(recv.identifier);
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::sread:
@ -285,6 +363,7 @@ int main(
{
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::swrite:
@ -297,6 +376,7 @@ int main(
std::cerr << "cluster overfull"<< std::endl;
}
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
case db_op::confirm:
@ -304,6 +384,7 @@ int main(
reply.rep_id = recv.rep_id;
run_db.confirm(recv.identifier, *(bitops::regulated<uint64_t>*)&recv.page);
reply_size = sizeof(reply.rep_id);
buffer_cleanup<cleanup_type::normal>(recv_impl);
}
break;
default:

+ 34
- 2
src/test_client.cpp Ver fichero

@ -8,7 +8,11 @@ int main(int, char** argv) {
{
std::random_device hrng{};
std::minstd_rand srng{hrng()};
auto seed = hrng();
std::minstd_rand srng{seed};
std::minstd_rand cpyrng{seed};
auto soc = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in server;
@ -53,7 +57,35 @@ int main(int, char** argv) {
auto durr = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - begin)/tot;
std::cerr << "durr/elem = " << durr.count() << "ns/op" << std::endl;
std::cerr << "w:durr/elem = " << durr.count() << "ns/op" << std::endl;
std::cerr << "w:elem/s = " << 1000000000.0f/durr.count() << std::endl;
begin = std::chrono::high_resolution_clock::now();
for(size_t idx=0;idx<tot;idx++)
{
tar.x.internal = cpyrng();
tar.y.internal = cpyrng();
request.op = db_op::read;
sendto(
soc,
(void*)&request,
sizeof(request),
0,
(struct sockaddr*)&server,
(socklen_t)sizeof(server)
);
recv(soc, &reply, sizeof(reply), 0);
if(reply.page[0] != 1)
{
std::cerr << "read failed" << std::endl;
}
}
durr = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - begin)/tot;
std::cerr << "r:durr/elem = " << durr.count() << "ns/op" << std::endl;
std::cerr << "r:elem/s = " << 1000000000.0f/durr.count() << std::endl;
request.op = db_op::read;
sendto(

Cargando…
Cancelar
Guardar