Browse Source

Scheduling works

channel
Ludovic 'Archivist' Lagouardette 3 years ago
parent
commit
9f1e9055ff
9 changed files with 299 additions and 87 deletions
  1. +1
    -1
      Makefile
  2. +8
    -0
      include/gp/vfs/process_data.hpp
  3. +33
    -5
      include/gp/vfs/runqueue.hpp
  4. +16
    -1
      include/gp/vfs/scheduler.hpp
  5. +0
    -44
      include/gp/vfs/scheduling/simple_lockfree_scheduling.hpp
  6. +97
    -0
      include/gp/vfs/scheduling/simple_scheduling.hpp
  7. +74
    -30
      include/gp/vfs/system.hpp
  8. +11
    -1
      include/gp_config.hpp
  9. +59
    -5
      tests/channel_test.cpp

+ 1
- 1
Makefile View File

@ -1,5 +1,5 @@
CXX= clang++-10
CXXFLAGS= --std=c++20 -O2 -g -pthread -DGP_TESTS -DFUZZ_STRENGTH=100000 -DNO_BENCH=0 -pedantic \
CXXFLAGS= --std=c++20 -O0 -g -pthread -DGP_TESTS -DFUZZ_STRENGTH=100000 -DNO_BENCH=0 -pedantic \
-fprofile-instr-generate -fcoverage-mapping -Wno-unknown-attributes -fno-omit-frame-pointer \
# -fsanitize=address -fsanitize-blacklist=blacklist.txt

+ 8
- 0
include/gp/vfs/process_data.hpp View File

@ -38,6 +38,14 @@ namespace gp {
, state(gp::process_status::inactive)
, specifics(gp::buffer<char>{(char*)stack, stack_sz})
{}
process_data(process_data&& v)
: fn(v.fn)
, stack(v.stack)
, stack_sz(v.stack_sz)
, state(v.state)
, specifics(v.specifics)
{}
};
}

+ 33
- 5
include/gp/vfs/runqueue.hpp View File

@ -3,13 +3,29 @@
#include "gp/vfs/process_data.hpp"
#include <atomic>
#include <new>
namespace gp {
struct topic_list{
struct node{
std::atomic_bool is_locked;
k">alignas(gp_config::limits::hardware_constructive_interference_size) std::atomic_bool is_locked;
gp::process_data* value;
std::atomic<struct node*> next;
alignas(gp_config::limits::hardware_constructive_interference_size) std::atomic<struct node*> next;
node()
{
is_locked = false;
value = nullptr;
next = nullptr;
}
node(node&& v)
{
v.try_acquire();
is_locked = false;
value = gp::move(v.value);
next = v.next.load();
}
bool try_acquire() noexcept {
bool expected = false;
@ -24,6 +40,11 @@ namespace gp {
using node_ptr = struct node*;
using node_ptr_rep = std::atomic<struct node*>;
topic_list()
: start{nullptr}
, end{nullptr}
{}
node_ptr_rep start;
node_ptr_rep end;
@ -36,9 +57,10 @@ namespace gp {
auto replace = ptr->next.load();
auto expected = ptr;
if(end.load() == ptr) {
end.store(nullptr);
replace = nullptr;
}
if(start.compare_exchange_strong(expected, replace)) {
end.store(nullptr);
return ptr;
} else {
return nullptr;
@ -51,21 +73,27 @@ namespace gp {
// ONLY PUSH ACQUIRED NODES,
// RELEASE WHEN NO LONGER IN USE
bool try_push(node_ptr node) {
node->next.store(nullptr);
auto ed = end.load();
if(ed) {
if(ed->try_acquire()) {
auto old_ed = ed;
node->next.store(ed);
if(end.compare_exchange_strong(ed, node)) {
ed->release();
node->release();
old_ed->release();
return true;
} else {
ed->release();
node->release();
old_ed->release();
return false;
}
} else return false;
} else {
if(end.compare_exchange_strong(ed, node)) {
start.store(node);
node->release();
return true;
} else {
return false;

+ 16
- 1
include/gp/vfs/scheduler.hpp View File

@ -13,6 +13,8 @@ struct scheduler {
topic_list::node_ptr current;
size_t id;
system& sys;
process_data main_context_data;
topic_list::node main_context;
no_inline_decl(
void yield_to(topic_list::node_ptr target)
@ -20,9 +22,22 @@ struct scheduler {
scheduler(class system&, size_t token);
[[noreturn]] void startup();
scheduler(scheduler&& v)
: previous(v.previous)
, current(v.current)
, id(v.id)
, sys(v.sys)
, main_context_data(gp::move(v.main_context_data))
, main_context(gp::move(v.main_context))
{}
void run();
void yield();
~scheduler() {
}
};
}

+ 0
- 44
include/gp/vfs/scheduling/simple_lockfree_scheduling.hpp View File

@ -1,44 +0,0 @@
#pragma once
#include "gp/vfs/system.hpp"
namespace gp{
class simple_lockfree_scheduling : gp::scheduling_scheme {
gp::topic_list running;
gp::topic_list waiting;
gp::topic_list to_clean;
gp::topic_list naughty;
scheduler me;
public:
virtual gp::topic_list::node_ptr one(size_t) {
auto v = running.try_pop();
do{
if(v) return v;
v = running.try_pop();
}while(true);
}
virtual gp::topic_list::node_ptr next(size_t, gp::topic_list::node_ptr current) {
switch(current->value->state) {
case process_status::inactive:
case process_status::running:
do{}while(!running.try_push(current));
break;
case process_status::finished:
do{}while(!to_clean.try_push(current));
break;
case process_status::zombie:
do{}while(!naughty.try_push(current));
break;
case process_status::waiting:
do{}while(!waiting.try_push(current));
break;
}
return one(0);
}
virtual gp::scheduler& current_scheduler() {
return me;
}
};
}

+ 97
- 0
include/gp/vfs/scheduling/simple_scheduling.hpp View File

@ -0,0 +1,97 @@
#pragma once
#include "gp/vfs/system.hpp"
namespace gp{
class simple_scheduling : public gp::scheduling_scheme {
struct locked_circular_buffer {
std::atomic_bool lock;
gp::array<gp::topic_list::node_ptr, gp_config::limits::max_processes> processes;
size_t read_idx = 0;
size_t write_idx = 0;
void push(gp::topic_list::node_ptr node) {
bool t = true;
bool f = false;
while(not lock.compare_exchange_strong(f,t)){}
{
gp_config::assertion((write_idx + 1)%processes.size() != read_idx, "bad push to circular buffer");
processes[write_idx] = node;
write_idx=(write_idx+1)%processes.size();
}
while(not lock.compare_exchange_strong(t,f)){}
}
gp::topic_list::node_ptr pop() {
bool t = true;
bool f = false;
gp::topic_list::node_ptr ret;
while(not lock.compare_exchange_strong(f,t)){}
{
if(read_idx == write_idx) {
ret = nullptr;
} else {
ret = processes[read_idx];
read_idx=(read_idx+1)%processes.size();
}
}
while(not lock.compare_exchange_strong(t,f)){}
return ret;
}
};
locked_circular_buffer running;
locked_circular_buffer waiting;
locked_circular_buffer to_clean;
locked_circular_buffer naughty;
system* sys = nullptr;
public:
simple_scheduling()
{}
virtual void link(system& value) {
gp_config::assertion(!(sys), "Double linkage detected");
sys = &value;
sys->process_managers.emplace_back(*sys,0);
}
virtual gp::topic_list::node_ptr one(size_t) {
auto v = running.pop();
do{
if(v) return v;
v = running.pop();
}while(true);
}
virtual void push(gp::topic_list::node_ptr node) {
running.push(node);
}
virtual gp::topic_list::node_ptr next(size_t, gp::topic_list::node_ptr current) {
switch(current->value->state) {
case process_status::inactive:
case process_status::running:
running.push(current);
break;
case process_status::finished:
to_clean.push(current);
break;
case process_status::zombie:
naughty.push(current);
break;
case process_status::waiting:
waiting.push(current);
break;
}
return one(0);
}
virtual gp::scheduler& current_scheduler() {
return sys->process_managers[0];
}
};
}

+ 74
- 30
include/gp/vfs/system.hpp View File

@ -17,23 +17,39 @@ class scheduling_scheme {
public:
virtual topic_list::node_ptr one(size_t token) = 0;
virtual topic_list::node_ptr next(size_t token, topic_list::node_ptr current) = 0;
virtual void push(topic_list::node_ptr current) = 0;
virtual void link(class system&) = 0;
virtual scheduler& current_scheduler() = 0;
};
class system {
friend struct scheduler;
public:
gp::reference_wrapper<gp::allocator> system_allocator;
gp::vector<gp::filesystem*> filesystems{system_allocator};
gp::vector<gp::scheduler> process_managers;
scheduling_scheme& scheme;
topic_list::node main_context;
friend class scheduler;
public:
system(allocator& v, scheduling_scheme& scheme_)
system(allocator& v, scheduling_scheme& scheme_, gp::buffer<char> stack_estimate = gp::buffer<char>{nullptr, nullptr})
: system_allocator{v}
, process_managers{system_allocator}
, scheme{scheme_}
{}
{
[[maybe_unused]] volatile char a;
if(stack_estimate.size() == 0) {
auto seed = (char*)&a;
auto jump = (uintptr_t)seed % gp_config::limits::process_stack;
seed -= jump + (jump == 0)*gp_config::limits::process_stack;
auto page_cnt = 1;
if(jump == 0) page_cnt++;
stack_estimate = gp::buffer<char>{seed, (size_t)(gp_config::limits::process_stack*page_cnt)};
}
main_context.value = (process_data*)system_allocator.get().allocate(sizeof(process_data));
new(main_context.value) process_data(gp::function<void()>([]() -> void{}, nullopt), stack_estimate.begin().data, stack_estimate.size());
gp_config::assertion(main_context.value != nullptr, "failed to allocate return to main switch");
scheme.link(*this);
}
size_t spawn(gp::function<void()> fn) {
constexpr size_t stack_sz = gp_config::limits::process_stack;
@ -48,33 +64,64 @@ public:
);
new(pp) topic_list::node();
pp->value = created_process;
auto pid = pp->value->pid;
scheme.push(pp);
return pid;
}
template<typename threading_function>
void run(threading_function thread_starter) {
for(auto& i : process_managers) {
gp::function<void(void)> runner{
[&](){
i.run();
},
system_allocator
};
thread_starter(
runner
);
}
}
auto& sched = scheme.current_scheduler();
sched.yield_to(scheme.next(sched.id, pp));
return pp->value->pid;
void yield() {
scheme.current_scheduler().yield();
}
};
scheduler::scheduler(class system& v, size_t token)
: id(token)
, sys(v)
{}
, main_context_data{gp::function<void()>{[](){}, v.system_allocator}, nullptr, size_t(0)}
, main_context()
{
main_context.value = &main_context_data;
gp_config::assertion(!main_context.try_acquire(), "node should be not aquired on creation");
}
no_inline_decl(inline scheduler* spawner (scheduler* new_p)) {
auto& proc = *new_p->current->value;
if(proc.state == gp::process_status::inactive) {
proc.state = gp::process_status::running;
proc.fn();
proc.state = gp::process_status::finished;
}
return new_p;
}
void scheduler::yield_to(topic_list::node_ptr target)
{
previous = current;
current->value->specifics.pull();
current = target;
no_inline_decl([&](scheduler* new_p)){
*new_p->previous->release();
auto& proc = *new_p->current->value;
if(proc.state == gp::process_status::inactive) {
proc.state = gp::process_status::running;
proc.fn();
proc.state = gp::process_status::finished;
new_p->yield_to(new_p->sys.scheme.next(new_p->id, new_p->current));
}
}(static_cast<scheduler*>(target->value->specifics.push(this)));
auto new_p = this;
do{
new_p = spawner(static_cast<scheduler*>(target->value->specifics.push(new_p)));
target = new_p->sys.scheme.next(new_p->id, new_p->current);
new_p->previous = new_p->current;
new_p->current->value->specifics.pull();
new_p->current = target;
} while(true);
}
@ -84,18 +131,15 @@ void scheduler::yield(){
}
void scheduler::startup()
void scheduler::run()
{
current = sys.scheme.one(id);
no_inline_decl([&](scheduler* new_p)){
auto& proc = *new_p->current->value;
if(proc.state == gp::process_status::inactive) {
proc.state = gp::process_status::running;
proc.fn();
proc.state = gp::process_status::finished;
new_p->yield_to(new_p->sys.scheme.next(new_p->id, new_p->current));
}
}(static_cast<scheduler*>(target->value->specifics.push(this)));
main_context_data.pid = 0;
main_context_data.state = process_status::running;
main_context_data.specifics.pull();
current = &main_context;
sys.scheme.push(&main_context);
auto new_p = spawner(static_cast<scheduler*>(current->value->specifics.push(this)));
new_p->yield_to(new_p->sys.scheme.next(new_p->id, new_p->current));
}

+ 11
- 1
include/gp_config.hpp View File

@ -1,5 +1,6 @@
#pragma once
#include <new>
#include <type_traits>
#include <cstddef>
@ -24,7 +25,16 @@ namespace gp_config{
namespace limits {
constexpr size_t max_processes = 4096;
constexpr size_t max_fd_per_process = 128;
constexpr size_t process_stack = 1024; // times 16
constexpr size_t process_stack = 1024;
constexpr size_t process_stack_align_to = 16;
constexpr size_t stack_grow_upwards = false;
#if __cpp_lib_hardware_interference_size >= 201603
constexpr size_t hardware_constructive_interference_size = std::hardware_constructive_interference_size;
constexpr size_t hardware_destructive_interference_size = std::hardware_destructive_interference_size;
#else
constexpr size_t hardware_constructive_interference_size = 128;
constexpr size_t hardware_destructive_interference_size = 128;
#endif
}
namespace memory_module{

+ 59
- 5
tests/channel_test.cpp View File

@ -4,32 +4,86 @@
#include <gp/array.hpp>
#include <gp/enveloppe/cbor.hpp>
#include <gp/vfs/system.hpp>
#include <gp/vfs/scheduling/simple_scheduling.hpp>
#include "test_scaffold.h"
#include <atomic>
#include <memory>
#include <vector>
#include <random>
#include <thread>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
using weight_t = float;
template<typename T>
struct node {
node(T _value)
: value(_value)
{}
std::vector<std::pair<std::shared_ptr<node>, weight_t>> next_links;
T value;
};
struct point {
float x, y;
};
std::thread* leaver_4989487;
std::atomic_int quit_signal_4989487 = 0;
gp::system* sys_ptr_4989487;
struct channel_test : public test_scaffold {
channel_test() {
name = __FILE__ ":1";
}
std::unique_ptr<
gp::array<char, 4096*512>,
std::default_delete<gp::array<char, 4096*512>>
> store = std::make_unique<gp::array<char, 4096*512>>();
gp::buddy<> alloc{&*store->begin(), store->size()};
gp::simple_scheduling sched{};
gp::system& sys = *(new(alloc.allocate(sizeof(gp::system))) gp::system{alloc, sched});
struct terminator{};
virtual int run() {
auto store = std::make_unique<gp::array<char, 4096*512>>();
gp::buddy<> alloc{&*store->begin(), store->size()};
gp::system sys{alloc, 1};
std::atomic_int a = 0;
sys_ptr_4989487 = &sys;
sys.spawn(gp::function<void()>{[&](){
a.fetch_add(1);
}, alloc});
sys.run_once();
sys.spawn(gp::function<void()>{[&](){
while(!quit_signal_4989487.load()) {
std::this_thread::sleep_for(500us);
}
pthread_exit(nullptr);
}, alloc});
sys.run([](gp::function<void(void)> a) {
leaver_4989487 = new std::thread(a);
});
leaver_4989487->detach();
std::this_thread::sleep_for(16ms);
gp_config::assertion(a.load(), "did not increment (aka run the fiber)");
return 0;
}
virtual ~channel_test() {
quit_signal_4989487.store(1);
std::this_thread::sleep_for(1s);
}
};
append_test dummy_hke41r43(new channel_test{});

Loading…
Cancel
Save