Browse Source

Prepared for addition of a consensus orchestration system

master
Ludovic 'Archivist' Lagouardette 5 years ago
parent
commit
cb6cdbef31
6 changed files with 169 additions and 21 deletions
  1. +1
    -1
      Makefile
  2. +149
    -6
      include/database.hpp
  3. +1
    -1
      include/endian.hpp
  4. +10
    -5
      include/fsized_map.h
  5. +1
    -1
      include/network.hpp
  6. +7
    -7
      src/izaro-storage.cpp

+ 1
- 1
Makefile View File

@ -19,7 +19,7 @@
#
CXX := -clang++
CXX := -g++
DEBUG := -g -O0 -DUNITTEST
RELEASE := -s -O3 -fno-rtti

+ 149
- 6
include/database.hpp View File

@ -95,7 +95,14 @@ public:
#pragma GCC diagnostic pop
}
void write(const record_identifier& target, const db_page& value){
/**************************************************************************
* *
* NO CONFIRM OPS *
* *
* *
*************************************************************************/
record write(const record_identifier& target, const db_page& value){
uint64_t page = std::numeric_limits<uint64_t>::max();;
size_t off = std::numeric_limits<size_t>::max();
if(metadata[0].last_delete>0)
@ -127,6 +134,7 @@ public:
tmp.second.record_head.split = target;
tmp.second.timestamp = ts;
tmp.second.offset = page;
tmp.second.flags = (uint32_t)record_flags::confirmation;
do{
uint64_t pos = hashed_roll % records.size();
@ -151,11 +159,11 @@ public:
} else {
(*metadata).last_page += (size_t)1;
}
return tmp.second;
}
std::pair<uint64_t, db_page> read(const record_identifier& target) {
std::pair<uint64_t, db_page> ret;
ret.first = 0;
std::pair<record, db_page> read(const record_identifier& target) {
std::pair<record, db_page> ret;
ret.second.fill(0);
uint64_t hashed = std::hash<record_identifier>{}(target);
@ -174,9 +182,9 @@ public:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(ret.first<value.timestamp)
if(ret.firstp">.timestamp<value.timestamp)
{
ret.first = value.timestamp;
ret.first = value;
ret.second = pages[value.offset];
}
break;
@ -220,4 +228,139 @@ public:
void rollback(const record_identifier&) {
}
/**************************************************************************
* *
* CONFIRM OPS *
* *
* *
*************************************************************************/
record stepped_write(const record_identifier& target, const db_page& value){
uint64_t page = std::numeric_limits<uint64_t>::max();;
size_t off = std::numeric_limits<size_t>::max();
if(metadata[0].last_delete>0)
{
off = (*metadata).last_delete;
page = delete_table[off-1];
} else {
page = (*metadata).last_page;
if(page>=pages.size()) {
throw std::runtime_error("PAGE STARVATION! MUST EXIT NOW");
}
}
if(page == std::numeric_limits<uint64_t>::max())
{
throw std::runtime_error("PAGE ERROR! MUST EXIT NOW");
}
pages[page] = value;
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
bool succeed = false;
uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::pair<bitops::regulated<uint64_t>,record> tmp{0, record{}};
tmp.first = hashed;
tmp.second.record_head.split = target;
tmp.second.timestamp = ts;
tmp.second.offset = page;
tmp.second.flags = 0;
do{
uint64_t pos = hashed_roll % records.size();
switch (static_cast<uint64_t>(records[pos].second.timestamp)) {
case 0:
[[fallthrough]];
case std::numeric_limits<uint64_t>::max():
records[pos] = tmp;
succeed = true;
break;
default:
break;
}
hashed_roll++;
}while(!succeed);
if(off != std::numeric_limits<size_t>::max())
{
(*metadata).last_delete += -1;
delete_table[off] = std::numeric_limits<size_t>::max();
} else {
(*metadata).last_page += (size_t)1;
}
return tmp.second;
}
std::pair<record, db_page> stepped_read(const record_identifier& target) {
std::pair<record, db_page> ret;
ret.second.fill(0);
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return ret;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(static_cast<uint32_t>(value.flags) & (uint32_t)record_flags::confirmation)
if(ret.first.timestamp<value.timestamp)
{
ret.first = value;
ret.second = pages[value.offset];
}
break;
}
}
hashed_roll++;
}while(true);
return ret;
}
void stepped_remove(const record_identifier& target) {
remove(target);
}
void confirm(const record_identifier& target, const bitops::regulated<uint64_t>& timestamp) {
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(timestamp == value.timestamp)
{
value.flags = (uint32_t)value.flags | (uint32_t)record_flags::confirmation;
}
break;
}
}
hashed_roll++;
}while(true);
}
};

+ 1
- 1
include/endian.hpp View File

@ -73,7 +73,7 @@ namespace bitops{
internal = swap_if_little(T(*this)+v);
}
constexpr operator T() {
constexpr operator T() k">const {
return swap_if_little(internal);
}
};

+ 10
- 5
include/fsized_map.h View File

@ -36,10 +36,10 @@ struct [[gnu::packed]] record_identifier{
template<>
struct std::hash<record_identifier> {
uint64_t operator() (const record_identifier& value) {
uint64_t v = o">*(uint32_t*)&value.x.internal;
uint64_t v = n">static_cast<uint32_t>(value.x);
v <<= 4;
v += o">*(uint32_t*)&value.y.internal;
v += n">static_cast<uint32_t>(value.y);
v ^= *(uint64_t*)&value.uuid;
v ^= *(((uint64_t*)&value.uuid)+1);
@ -47,15 +47,20 @@ struct std::hash {
}
};
struct record{
enum class record_flags : uint32_t {
confirmation = 1
};
struct [[gnu::packed]] record{
record()
{}
union{
union [[gnu::packed]] {
std::array<uint8_t, 24> full;
record_identifier split = record_identifier();
} record_head;
bitops::regulated<uint64_t> timestamp = 0;
size_t offset = 0;
bitops::regulated<uint64_t> offset = 0;
bitops::regulated<uint32_t> flags = 0;
};
using db_page = std::array<uint8_t, 16384>;

+ 1
- 1
include/network.hpp View File

@ -20,7 +20,7 @@ struct [[gnu::packed]] received_data {
struct [[gnu::packed]] sending_data {
bitops::regulated<uint64_t> rep_id = 0;
record_identifier identifier = record_identifier{};
record identifier = record{};
db_page page = {0};
};

+ 7
- 7
src/izaro-storage.cpp View File

@ -202,16 +202,16 @@ int main(
{
case db_op::version:
reply.rep_id = recv.rep_id;
reply.identifier.x = 1;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier.x);
reply.identifier.record_head.split.x = 1;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier.record_head.split.x);
break;
case db_op::read:
{
reply.rep_id = recv.rep_id;
auto req = run_db.read(recv.identifier);
if(req.first != 0)
if(req.first.timestamp != 0)
{
reply.identifier = recv.identifier;
reply.identifier = req.first;
reply.page = req.second;
reply_size = sizeof(reply);
}
@ -225,8 +225,8 @@ int main(
{
reply.rep_id = recv.rep_id;
try{
run_db.write(recv.identifier, recv.page);
reply.identifier = recv.identifier;
k">auto req = run_db.write(recv.identifier, recv.page);
reply.identifier = req;
} catch (...) {
std::cerr << "cluster overfull"<< std::endl;
}
@ -237,7 +237,7 @@ int main(
{
reply.rep_id = recv.rep_id;
run_db.remove(recv.identifier);
reply.identifier = recv.identifier;
reply.identifier.record_head.split = recv.identifier;
reply_size = sizeof(reply.rep_id) + sizeof(reply.identifier);
}
break;

Loading…
Cancel
Save