Goddess of Justice DB, the database used for storage on IzaroDFS
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 

329 rader
7.3 KiB

#include <iostream>
#include <fstream>
#include <sstream>
#include <variant>
#include <chrono>
#include <algorithm>
#include "database.hpp"
#include "network.hpp"
#include <memory>
#include <cstdlib>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <stdio.h>
#include "commander.hpp"
int main(
[[maybe_unused]] int argc,
[[maybe_unused]] char** argv
)
{
CMD::commander cmd_args(argc, argv);
std::string database_str = "/data/";
if(cmd_args.isFlagSet("-database"))
{
database_str = cmd_args.getFlagValue("-database");
}
std::string utest_str = "/tmp/";
if(cmd_args.isFlagSet("-utest"))
{
utest_str = cmd_args.getFlagValue("-utest");
}
size_t database_size = 4096;
if(cmd_args.isFlagSet("-page-count"))
{
try{
database_size = std::stoll(cmd_args.getFlagValue("-page-count"));
} catch (...) {
std::cerr << "Invalid page-count value" << std::endl;
return 1;
}
}
size_t db_port = 20450;
if(cmd_args.isFlagSet("-port"))
{
try{
db_port = std::stoi(cmd_args.getFlagValue("-port"));
if(db_port>std::numeric_limits<uint16_t>::max()) throw std::runtime_error("invalid_port");
} catch (...) {
std::cerr << "Invalid port value" << std::endl;
return 1;
}
}
#ifdef UNITTEST
size_t utest_size = 4096;
if(cmd_args.isFlagSet("-utest-count"))
{
try{
utest_size = std::stoll(cmd_args.getFlagValue("-utest-count"));
} catch (...) {
std::cerr << "Invalid utest-count value" << std::endl;
return 1;
}
}
{
database db(database::create(utest_str, utest_size*2));
db_page v;
v.fill(1);
record_identifier tar;
auto begin = std::chrono::high_resolution_clock::now();
for(size_t idx=0;idx<utest_size;idx++)
{
tar.y.internal = idx;
db.write(tar, v);
}
auto durr = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - begin)/utest_size;
std::cerr << "durr/elem = " << durr.count() << "ns/op" << std::endl;
auto a = db.read(tar);
if(a.second[0] != 1) {
std::cerr << "read failed" << std::endl;
}
db.remove(tar);
a = db.read(tar);
if(a.second[0] == 1) {
std::cerr << "remove failed" << std::endl;
}
{
db.write(tar, v);
a = db.read(tar);
if(a.second[0] != 1) {
std::cerr << "read failed" << std::endl;
}
db.remove(tar);
a = db.read(tar);
if(a.second[0] != 0) {
std::cerr << "remove failed" << std::endl;
}
v.fill(2);
db.write(tar, v);
}
database reop(database::open(utest_str));
{
a = reop.read(tar);
if(a.second[0] != 2) {
std::cerr << "reopen read failed" << std::endl;
}
v.fill(3);
reop.write(tar, v);
a = reop.read(tar);
if(a.second[0] != 3) {
std::cerr << "reopen write failed" << std::endl;
}
}
reop.pages.clear();
reop.metadata.clear();
reop.records.clear();
reop.delete_table.clear();
db.pages.clear();
db.metadata.clear();
db.records.clear();
db.delete_table.clear();
}
#endif
if(cmd_args.isFlagSet("-create"))
{
try{
database creat(database::create(database_str, database_size));
std::cout << "Created" << std::endl;
return 0;
} catch (...) {
std::cerr << "Creating of " << database_str << " of " << database_size << " pages failed" << std::endl;
return 1;
}
}
try{
database run_db(database::open(database_str));
auto soc = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(db_port);
memset((void*)&addr.sin_addr, 0, sizeof(addr.sin_addr));
bind(soc,(struct sockaddr*)&addr,sizeof(addr));
do{
received_data recv;
sending_data reply;
sockaddr_in client;
socklen_t packet_sz;
size_t reply_size;
recvfrom(
soc,
(void*)&recv,
sizeof(received_data),
MSG_WAITFORONE,
(struct sockaddr*)&client,
&packet_sz
);
if(packet_sz < sizeof(db_op)) continue;
switch (static_cast<db_op>(recv.op))
{
case db_op::version:
reply.rep_id = recv.rep_id;
reply.identifier.record_head.split.x = 1;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier.record_head.split.x);
break;
case db_op::read:
{
reply.rep_id = recv.rep_id;
auto req = run_db.read(recv.identifier);
if(req.first.timestamp != 0)
{
reply.identifier = req.first;
reply.page = req.second;
reply_size = sizeof(reply);
}
else
{
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
}
break;
case db_op::write:
{
reply.rep_id = recv.rep_id;
try{
auto req = run_db.write(recv.identifier, recv.page);
reply.identifier = req;
} catch (...) {
std::cerr << "cluster overfull"<< std::endl;
}
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
// case db_op::sremove:
case db_op::remove:
{
reply.rep_id = recv.rep_id;
run_db.remove(recv.identifier);
reply.identifier.record_head.split = recv.identifier;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
case db_op::stats:
{
reply.rep_id = recv.rep_id;
auto stats = (stats_data*)&reply.page;
stats->total_pages = (*run_db.metadata).page_cnt;
stats->total_records = (*run_db.metadata).record_cnt;
stats->free_deleted = (*run_db.metadata).last_delete;
stats->free =
(*run_db.metadata).last_delete
+ (*run_db.metadata).page_cnt
- (*run_db.metadata).last_page;
uint64_t cow_full = 0;
for(auto& elem : run_db.records)
{
if(elem.second.timestamp == std::numeric_limits<uint64_t>::max())
{
++cow_full;
}
}
stats->cow_full = cow_full;
reply_size = sizeof(reply);
}
break;
case db_op::sallocate:
{
reply.rep_id = recv.rep_id;
reply.identifier = run_db.try_allocate(recv.identifier);
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
case db_op::sread:
{
reply.rep_id = recv.rep_id;
auto req = run_db.stepped_read(recv.identifier);
if(req.first.timestamp != 0)
{
reply.identifier = req.first;
reply.page = req.second;
reply_size = sizeof(reply);
}
else
{
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
}
break;
case db_op::swrite:
{
reply.rep_id = recv.rep_id;
try{
auto req = run_db.stepped_write(recv.identifier, recv.page);
reply.identifier = req;
} catch (...) {
std::cerr << "cluster overfull"<< std::endl;
}
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
case db_op::confirm:
{
reply.rep_id = recv.rep_id;
run_db.confirm(recv.identifier, *(bitops::regulated<uint64_t>*)&recv.page);
reply_size = sizeof(reply.rep_id);
}
break;
default:
std::cerr << "bad_request " << (uint32_t)static_cast<db_op>(recv.op) << std::endl;
continue;
break;
}
sendto(
soc,
(void*)&reply,
reply_size,
0,
(struct sockaddr*)&client,
(socklen_t)sizeof(client)
);
//std::cerr << "reply to " << (uint32_t)static_cast<db_op>(recv.op) << std::endl;
}while(true);
} catch (...) {
std::cerr << "GoJDB crashed !" << std::endl;
return 1;
}
}