Goddess of Justice DB, the database used for storage on IzaroDFS
 
 
 

425 lines
11 KiB

#pragma once
#include "fsized_map.h"
#include <chrono>
#include <random>
enum class data_mode {
A,
B,
AB
};
struct metadata_t{
bitops::regulated<uint64_t> record_cnt;
bitops::regulated<uint64_t> page_cnt;
bitops::regulated<uint64_t> delete_cnt;
bitops::regulated<uint64_t> last_page;
bitops::regulated<uint64_t> last_delete;
};
class database {
database(){}
public:
mmap_array<std::pair<bitops::regulated<uint64_t>, record>> records;
mmap_array<db_page> pages;
mmap_array<metadata_t> metadata;
mmap_array<size_t> delete_table;
database(
const std::string& records,
const std::string& pages,
const std::string& deletions,
const std::string& meta,
const size_t& record_cnt = 4096,
const size_t& page_cnt = 4096,
const size_t& delete_cnt = 512
)
: records{record_cnt, records}
, pages{page_cnt, pages}
, metadata{(size_t)1, meta}
, delete_table{delete_cnt, deletions}
{}
static void mark_empty_page(db_page& gojp, size_t idx)
{
auto& gojmark = *(std::array<bitops::regulated<uint32_t>, db_page_size/sizeof(uint32_t)>*)&gojp;
bitops::regulated<uint32_t> mark = 0;
std::array<char,4> mark_array = {'G', 'o', 'J', '@'};
mark.internal = *(uint32_t*)&mark_array;
uint32_t slice = static_cast<uint32_t>(idx);
for(size_t mpos = 0; mpos < gojmark.size(); mpos+=2)
{
gojmark[mpos] = mark;
gojmark[mpos+1] = slice++;
}
}
static database&& create(const std::string dir, size_t page_nb) {
database ret{
dir+"records",
dir+"pages",
dir+"deleted",
dir+"meta",
page_nb+page_nb/2,
page_nb,
page_nb/8
};
std::cout << "Record table: ";
for(auto& n : ret.records)
{
n.second.timestamp = 0;
n.second.offset = 0;
n.second.flags = 0;
}
std::cout << "DONE" << std::endl;
std::cout << "Delete table: ";
for(auto& n : ret.delete_table)
{
n = std::numeric_limits<size_t>::max();
}
std::cout << "DONE" << std::endl;
for(size_t idx = 0; idx < ret.pages.size(); idx++)
{
mark_empty_page(ret.pages[idx], idx);
if(idx % 120000 == 0)
{
std::cout << "Page markings: " << uint16_t(100.0f*float(idx)/float(ret.pages.size())) << "%" << std::endl;
}
}
std::cout << "Page markings: DONE" << std::endl;
(*ret.metadata).last_page = 0;
(*ret.metadata).last_delete = 0;
(*ret.metadata).record_cnt = page_nb+page_nb/2;
(*ret.metadata).page_cnt = page_nb;
(*ret.metadata).delete_cnt = page_nb/8;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wreturn-local-addr"
return std::move(ret);
#pragma GCC diagnostic pop
}
static database&& open(const std::string dir) {
mmap_array<metadata_t> tmp{(size_t)1, dir+"meta"};
database ret{
dir+"records",
dir+"pages",
dir+"deleted",
dir+"meta",
(*tmp).record_cnt,
(*tmp).page_cnt,
(*tmp).delete_cnt
};
tmp.clear();
ret.records.enforce_caching();
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wreturn-local-addr"
return std::move(ret);
#pragma GCC diagnostic pop
}
/**************************************************************************
* *
* NO CONFIRM OPS *
* *
* *
*************************************************************************/
record write(const record_identifier& target, const db_page& value){
uint64_t page = std::numeric_limits<uint64_t>::max();;
size_t off = std::numeric_limits<size_t>::max();
if(metadata[0].last_delete>0)
{
off = (*metadata).last_delete;
page = delete_table[off-1];
} else {
page = (*metadata).last_page;
if(page>=pages.size()) {
throw std::runtime_error("PAGE STARVATION! MUST EXIT NOW");
}
}
if(page == std::numeric_limits<uint64_t>::max())
{
throw std::runtime_error("PAGE ERROR! MUST EXIT NOW");
}
pages[page] = value;
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
bool succeed = false;
uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::pair<bitops::regulated<uint64_t>,record> tmp{0, record{}};
tmp.first = hashed;
tmp.second.record_head.split = target;
tmp.second.timestamp = ts;
tmp.second.offset = page;
tmp.second.flags = (uint32_t)record_flags::confirmation;
do{
uint64_t pos = hashed_roll % records.size();
switch (static_cast<uint64_t>(records[pos].second.timestamp)) {
case 0:
[[fallthrough]];
case std::numeric_limits<uint64_t>::max():
records[pos] = tmp;
succeed = true;
break;
default:
break;
}
hashed_roll++;
}while(!succeed);
if(off != std::numeric_limits<size_t>::max())
{
(*metadata).last_delete += -1;
delete_table[off] = std::numeric_limits<size_t>::max();
} else {
(*metadata).last_page += (size_t)1;
}
return tmp.second;
}
std::pair<record, db_page> read(const record_identifier& target) {
std::pair<record, db_page> ret;
ret.second.fill(0);
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return ret;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(ret.first.timestamp<value.timestamp)
{
ret.first = value;
ret.second = pages[value.offset];
}
break;
}
}
hashed_roll++;
}while(true);
return ret;
}
void remove(const record_identifier& target) {
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
value.timestamp = std::numeric_limits<uint64_t>::max();
(*metadata).last_delete+=1;
delete_table[(*metadata).last_delete-1] = value.offset;
mark_empty_page(pages[value.offset], value.offset);
value.offset = 0;
}
break;
}
hashed_roll++;
}while(true); // return only happens on hitting a case 0
}
void rollback(const record_identifier&) {
}
/**************************************************************************
* *
* CONFIRM OPS *
* *
* *
*************************************************************************/
record stepped_write(const record_identifier& target, const db_page& value){
uint64_t page = std::numeric_limits<uint64_t>::max();;
size_t off = std::numeric_limits<size_t>::max();
if(metadata[0].last_delete>0)
{
off = (*metadata).last_delete;
page = delete_table[off-1];
} else {
page = (*metadata).last_page;
if(page>=pages.size()) {
throw std::runtime_error("PAGE STARVATION! MUST EXIT NOW");
}
}
if(page == std::numeric_limits<uint64_t>::max())
{
throw std::runtime_error("PAGE ERROR! MUST EXIT NOW");
}
pages[page] = value;
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
bool succeed = false;
uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::pair<bitops::regulated<uint64_t>,record> tmp{0, record{}};
tmp.first = hashed;
tmp.second.record_head.split = target;
tmp.second.timestamp = ts;
tmp.second.offset = page;
tmp.second.flags = 0;
do{
uint64_t pos = hashed_roll % records.size();
switch (static_cast<uint64_t>(records[pos].second.timestamp)) {
case 0:
[[fallthrough]];
case std::numeric_limits<uint64_t>::max():
records[pos] = tmp;
succeed = true;
break;
default:
break;
}
hashed_roll++;
}while(!succeed);
if(off != std::numeric_limits<size_t>::max())
{
(*metadata).last_delete += -1;
delete_table[off] = std::numeric_limits<size_t>::max();
} else {
(*metadata).last_page += (size_t)1;
}
return tmp.second;
}
record try_allocate(const record_identifier& target)
{
auto attempt = read(target);
if(attempt.first.timestamp == 0)
{
db_page rnd_page;
{
std::random_device dev;
std::minstd_rand temprng(dev());
auto tmp = (std::array<uint32_t, sizeof(db_page)/sizeof(uint32_t)>*)&rnd_page;
std::generate(tmp->begin(), tmp->end(), temprng);
}
return write(target, rnd_page);
}
return record{};
}
std::pair<record, db_page> stepped_read(const record_identifier& target) {
std::pair<record, db_page> ret;
ret.second.fill(0);
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return ret;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(static_cast<uint32_t>(value.flags) & (uint32_t)record_flags::confirmation)
if(ret.first.timestamp<value.timestamp)
{
ret.first = value;
ret.second = pages[value.offset];
}
break;
}
}
hashed_roll++;
}while(true);
return ret;
}
void stepped_remove(const record_identifier& target) {
remove(target);
}
void confirm(const record_identifier& target, const bitops::regulated<uint64_t>& timestamp) {
uint64_t hashed = std::hash<record_identifier>{}(target);
uint64_t hashed_roll = hashed;
do{
uint64_t pos = hashed_roll % records.size();
auto& value = records[pos].second;
switch (static_cast<uint64_t>(value.timestamp)) {
case 0:
return;
case std::numeric_limits<uint64_t>::max():
break;
default:
if(records[pos].first == hashed)
if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
{
if(timestamp == value.timestamp)
{
value.flags = (uint32_t)value.flags | (uint32_t)record_flags::confirmation;
}
break;
}
}
hashed_roll++;
}while(true);
}
};