Browse Source

Added the operation for sharding

master
Ludovic 'Archivist' Lagouardette 5 years ago
parent
commit
decf51fab0
3 changed files with 70 additions and 1 deletions
  1. +21
    -0
      include/database.hpp
  2. +6
    -1
      include/network.hpp
  3. +43
    -0
      src/izaro-storage.cpp

+ 21
- 0
include/database.hpp View File

@ -1,6 +1,7 @@
#pragma once
#include "fsized_map.h"
#include <chrono>
#include <random>
struct metadata_t{
bitops::regulated<uint64_t> record_cnt;
@ -296,6 +297,26 @@ public:
return tmp.second;
}
record try_allocate(const record_identifier& target)
{
auto attempt = read(target);
if(attempt.first.timestamp == 0)
{
db_page rnd_page;
{
std::random_device dev;
std::minstd_rand temprng(dev());
auto tmp = (std::array<uint32_t, sizeof(db_page)/sizeof(uint32_t)>*)&rnd_page;
std::generate(tmp->begin(), tmp->end(), temprng);
}
return write(target, rnd_page);
}
return record{};
}
std::pair<record, db_page> stepped_read(const record_identifier& target) {
std::pair<record, db_page> ret;
ret.second.fill(0);

+ 6
- 1
include/network.hpp View File

@ -8,7 +8,12 @@ enum class db_op : uint32_t {
read = 1,
write = 2,
remove = 3,
stats = 4
stats = 4,
sread = 5,
swrite = 6,
sallocate = 7,
sremove = 3,
confirm = 8
};
struct [[gnu::packed]] received_data {

+ 43
- 0
src/izaro-storage.cpp View File

@ -233,6 +233,7 @@ int main(
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
// case db_op::sremove:
case db_op::remove:
{
reply.rep_id = recv.rep_id;
@ -264,6 +265,48 @@ int main(
reply_size = sizeof(reply);
}
break;
case db_op::sallocate:
{
reply.rep_id = recv.rep_id;
reply.identifier = run_db.try_allocate(recv.identifier);
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
case db_op::sread:
{
reply.rep_id = recv.rep_id;
auto req = run_db.stepped_read(recv.identifier);
if(req.first.timestamp != 0)
{
reply.identifier = req.first;
reply.page = req.second;
reply_size = sizeof(reply);
}
else
{
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
}
break;
case db_op::swrite:
{
reply.rep_id = recv.rep_id;
try{
auto req = run_db.stepped_write(recv.identifier, recv.page);
reply.identifier = req;
} catch (...) {
std::cerr << "cluster overfull"<< std::endl;
}
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;
case db_op::confirm:
{
reply.rep_id = recv.rep_id;
run_db.confirm(recv.identifier, *(bitops::regulated<uint64_t>*)&recv.page);
reply_size = sizeof(reply.rep_id);
}
break;
default:
std::cerr << "bad_request " << (uint32_t)static_cast<db_op>(recv.op) << std::endl;
continue;

Loading…
Cancel
Save