Goddess of Justice DB, the database used for storage on IzaroDFS
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

424 lines
11 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. #pragma once
  2. #include "fsized_map.h"
  3. #include <chrono>
  4. #include <random>
  5. enum class data_mode {
  6. A,
  7. B,
  8. AB
  9. };
  10. struct metadata_t{
  11. bitops::regulated<uint64_t> record_cnt;
  12. bitops::regulated<uint64_t> page_cnt;
  13. bitops::regulated<uint64_t> delete_cnt;
  14. bitops::regulated<uint64_t> last_page;
  15. bitops::regulated<uint64_t> last_delete;
  16. };
  17. class database {
  18. database(){}
  19. public:
  20. mmap_array<std::pair<bitops::regulated<uint64_t>, record>> records;
  21. mmap_array<db_page> pages;
  22. mmap_array<metadata_t> metadata;
  23. mmap_array<size_t> delete_table;
  24. database(
  25. const std::string& records,
  26. const std::string& pages,
  27. const std::string& deletions,
  28. const std::string& meta,
  29. const size_t& record_cnt = 4096,
  30. const size_t& page_cnt = 4096,
  31. const size_t& delete_cnt = 512
  32. )
  33. : records{record_cnt, records}
  34. , pages{page_cnt, pages}
  35. , metadata{(size_t)1, meta}
  36. , delete_table{delete_cnt, deletions}
  37. {}
  38. static void mark_empty_page(db_page& gojp, size_t idx)
  39. {
  40. auto& gojmark = *(std::array<bitops::regulated<uint32_t>, db_page_size/sizeof(uint32_t)>*)&gojp;
  41. bitops::regulated<uint32_t> mark = 0;
  42. std::array<char,4> mark_array = {'G', 'o', 'J', '@'};
  43. mark.internal = *(uint32_t*)&mark_array;
  44. uint32_t slice = static_cast<uint32_t>(idx);
  45. for(size_t mpos = 0; mpos < gojmark.size(); mpos+=2)
  46. {
  47. gojmark[mpos] = mark;
  48. gojmark[mpos+1] = slice++;
  49. }
  50. }
  51. static database&& create(const std::string dir, size_t page_nb) {
  52. database ret{
  53. dir+"records",
  54. dir+"pages",
  55. dir+"deleted",
  56. dir+"meta",
  57. page_nb+page_nb/2,
  58. page_nb,
  59. page_nb/8
  60. };
  61. std::cout << "Record table: ";
  62. for(auto& n : ret.records)
  63. {
  64. n.second.timestamp = 0;
  65. n.second.offset = 0;
  66. n.second.flags = 0;
  67. }
  68. std::cout << "DONE" << std::endl;
  69. std::cout << "Delete table: ";
  70. for(auto& n : ret.delete_table)
  71. {
  72. n = std::numeric_limits<size_t>::max();
  73. }
  74. std::cout << "DONE" << std::endl;
  75. for(size_t idx = 0; idx < ret.pages.size(); idx++)
  76. {
  77. mark_empty_page(ret.pages[idx], idx);
  78. if(idx % 120000 == 0)
  79. {
  80. std::cout << "Page markings: " << uint16_t(100.0f*float(idx)/float(ret.pages.size())) << "%" << std::endl;
  81. }
  82. }
  83. std::cout << "Page markings: DONE" << std::endl;
  84. (*ret.metadata).last_page = 0;
  85. (*ret.metadata).last_delete = 0;
  86. (*ret.metadata).record_cnt = page_nb+page_nb/2;
  87. (*ret.metadata).page_cnt = page_nb;
  88. (*ret.metadata).delete_cnt = page_nb/8;
  89. #pragma GCC diagnostic push
  90. #pragma GCC diagnostic ignored "-Wreturn-local-addr"
  91. return std::move(ret);
  92. #pragma GCC diagnostic pop
  93. }
  94. static database&& open(const std::string dir) {
  95. mmap_array<metadata_t> tmp{(size_t)1, dir+"meta"};
  96. database ret{
  97. dir+"records",
  98. dir+"pages",
  99. dir+"deleted",
  100. dir+"meta",
  101. (*tmp).record_cnt,
  102. (*tmp).page_cnt,
  103. (*tmp).delete_cnt
  104. };
  105. tmp.clear();
  106. ret.records.enforce_caching();
  107. #pragma GCC diagnostic push
  108. #pragma GCC diagnostic ignored "-Wreturn-local-addr"
  109. return std::move(ret);
  110. #pragma GCC diagnostic pop
  111. }
  112. /**************************************************************************
  113. * *
  114. * NO CONFIRM OPS *
  115. * *
  116. * *
  117. *************************************************************************/
  118. record write(const record_identifier& target, const db_page& value){
  119. uint64_t page = std::numeric_limits<uint64_t>::max();;
  120. size_t off = std::numeric_limits<size_t>::max();
  121. if(metadata[0].last_delete>0)
  122. {
  123. off = (*metadata).last_delete;
  124. page = delete_table[off-1];
  125. } else {
  126. page = (*metadata).last_page;
  127. if(page>=pages.size()) {
  128. throw std::runtime_error("PAGE STARVATION! MUST EXIT NOW");
  129. }
  130. }
  131. if(page == std::numeric_limits<uint64_t>::max())
  132. {
  133. throw std::runtime_error("PAGE ERROR! MUST EXIT NOW");
  134. }
  135. pages[page] = value;
  136. uint64_t hashed = std::hash<record_identifier>{}(target);
  137. uint64_t hashed_roll = hashed;
  138. bool succeed = false;
  139. uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  140. std::pair<bitops::regulated<uint64_t>,record> tmp{0, record{}};
  141. tmp.first = hashed;
  142. tmp.second.record_head.split = target;
  143. tmp.second.timestamp = ts;
  144. tmp.second.offset = page;
  145. tmp.second.flags = (uint32_t)record_flags::confirmation;
  146. do{
  147. uint64_t pos = hashed_roll % records.size();
  148. switch (static_cast<uint64_t>(records[pos].second.timestamp)) {
  149. case 0:
  150. [[fallthrough]];
  151. case std::numeric_limits<uint64_t>::max():
  152. records[pos] = tmp;
  153. succeed = true;
  154. break;
  155. default:
  156. break;
  157. }
  158. hashed_roll++;
  159. }while(!succeed);
  160. if(off != std::numeric_limits<size_t>::max())
  161. {
  162. (*metadata).last_delete += -1;
  163. delete_table[off] = std::numeric_limits<size_t>::max();
  164. } else {
  165. (*metadata).last_page += (size_t)1;
  166. }
  167. return tmp.second;
  168. }
  169. std::pair<record, db_page> read(const record_identifier& target) {
  170. std::pair<record, db_page> ret;
  171. ret.second.fill(0);
  172. uint64_t hashed = std::hash<record_identifier>{}(target);
  173. uint64_t hashed_roll = hashed;
  174. do{
  175. uint64_t pos = hashed_roll % records.size();
  176. auto& value = records[pos].second;
  177. switch (static_cast<uint64_t>(value.timestamp)) {
  178. case 0:
  179. return ret;
  180. case std::numeric_limits<uint64_t>::max():
  181. break;
  182. default:
  183. if(records[pos].first == hashed)
  184. if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
  185. {
  186. if(ret.first.timestamp<value.timestamp)
  187. {
  188. ret.first = value;
  189. ret.second = pages[value.offset];
  190. }
  191. break;
  192. }
  193. }
  194. hashed_roll++;
  195. }while(true);
  196. return ret;
  197. }
  198. void remove(const record_identifier& target) {
  199. uint64_t hashed = std::hash<record_identifier>{}(target);
  200. uint64_t hashed_roll = hashed;
  201. do{
  202. uint64_t pos = hashed_roll % records.size();
  203. auto& value = records[pos].second;
  204. switch (static_cast<uint64_t>(value.timestamp)) {
  205. case 0:
  206. return;
  207. case std::numeric_limits<uint64_t>::max():
  208. break;
  209. default:
  210. if(records[pos].first == hashed)
  211. if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
  212. {
  213. value.timestamp = std::numeric_limits<uint64_t>::max();
  214. (*metadata).last_delete+=1;
  215. delete_table[(*metadata).last_delete-1] = value.offset;
  216. mark_empty_page(pages[value.offset], value.offset);
  217. value.offset = 0;
  218. }
  219. break;
  220. }
  221. hashed_roll++;
  222. }while(true); // return only happens on hitting a case 0
  223. }
  224. void rollback(const record_identifier&) {
  225. }
  226. /**************************************************************************
  227. * *
  228. * CONFIRM OPS *
  229. * *
  230. * *
  231. *************************************************************************/
  232. record stepped_write(const record_identifier& target, const db_page& value){
  233. uint64_t page = std::numeric_limits<uint64_t>::max();;
  234. size_t off = std::numeric_limits<size_t>::max();
  235. if(metadata[0].last_delete>0)
  236. {
  237. off = (*metadata).last_delete;
  238. page = delete_table[off-1];
  239. } else {
  240. page = (*metadata).last_page;
  241. if(page>=pages.size()) {
  242. throw std::runtime_error("PAGE STARVATION! MUST EXIT NOW");
  243. }
  244. }
  245. if(page == std::numeric_limits<uint64_t>::max())
  246. {
  247. throw std::runtime_error("PAGE ERROR! MUST EXIT NOW");
  248. }
  249. pages[page] = value;
  250. uint64_t hashed = std::hash<record_identifier>{}(target);
  251. uint64_t hashed_roll = hashed;
  252. bool succeed = false;
  253. uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
  254. std::pair<bitops::regulated<uint64_t>,record> tmp{0, record{}};
  255. tmp.first = hashed;
  256. tmp.second.record_head.split = target;
  257. tmp.second.timestamp = ts;
  258. tmp.second.offset = page;
  259. tmp.second.flags = 0;
  260. do{
  261. uint64_t pos = hashed_roll % records.size();
  262. switch (static_cast<uint64_t>(records[pos].second.timestamp)) {
  263. case 0:
  264. [[fallthrough]];
  265. case std::numeric_limits<uint64_t>::max():
  266. records[pos] = tmp;
  267. succeed = true;
  268. break;
  269. default:
  270. break;
  271. }
  272. hashed_roll++;
  273. }while(!succeed);
  274. if(off != std::numeric_limits<size_t>::max())
  275. {
  276. (*metadata).last_delete += -1;
  277. delete_table[off] = std::numeric_limits<size_t>::max();
  278. } else {
  279. (*metadata).last_page += (size_t)1;
  280. }
  281. return tmp.second;
  282. }
  283. record try_allocate(const record_identifier& target)
  284. {
  285. auto attempt = read(target);
  286. if(attempt.first.timestamp == 0)
  287. {
  288. db_page rnd_page;
  289. {
  290. std::random_device dev;
  291. std::minstd_rand temprng(dev());
  292. auto tmp = (std::array<uint32_t, sizeof(db_page)/sizeof(uint32_t)>*)&rnd_page;
  293. std::generate(tmp->begin(), tmp->end(), temprng);
  294. }
  295. return write(target, rnd_page);
  296. }
  297. return record{};
  298. }
  299. std::pair<record, db_page> stepped_read(const record_identifier& target) {
  300. std::pair<record, db_page> ret;
  301. ret.second.fill(0);
  302. uint64_t hashed = std::hash<record_identifier>{}(target);
  303. uint64_t hashed_roll = hashed;
  304. do{
  305. uint64_t pos = hashed_roll % records.size();
  306. auto& value = records[pos].second;
  307. switch (static_cast<uint64_t>(value.timestamp)) {
  308. case 0:
  309. return ret;
  310. case std::numeric_limits<uint64_t>::max():
  311. break;
  312. default:
  313. if(records[pos].first == hashed)
  314. if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
  315. {
  316. if(static_cast<uint32_t>(value.flags) & (uint32_t)record_flags::confirmation)
  317. if(ret.first.timestamp<value.timestamp)
  318. {
  319. ret.first = value;
  320. ret.second = pages[value.offset];
  321. }
  322. break;
  323. }
  324. }
  325. hashed_roll++;
  326. }while(true);
  327. return ret;
  328. }
  329. void stepped_remove(const record_identifier& target) {
  330. remove(target);
  331. }
  332. void confirm(const record_identifier& target, const bitops::regulated<uint64_t>& timestamp) {
  333. uint64_t hashed = std::hash<record_identifier>{}(target);
  334. uint64_t hashed_roll = hashed;
  335. do{
  336. uint64_t pos = hashed_roll % records.size();
  337. auto& value = records[pos].second;
  338. switch (static_cast<uint64_t>(value.timestamp)) {
  339. case 0:
  340. return;
  341. case std::numeric_limits<uint64_t>::max():
  342. break;
  343. default:
  344. if(records[pos].first == hashed)
  345. if(std::hash<record_identifier>{}(value.record_head.split) == hashed)
  346. {
  347. if(timestamp == value.timestamp)
  348. {
  349. value.flags = (uint32_t)value.flags | (uint32_t)record_flags::confirmation;
  350. }
  351. break;
  352. }
  353. }
  354. hashed_roll++;
  355. }while(true);
  356. }
  357. };