Browse Source

Added the paino lib to schedule coroutines and continuations

master
Ludovic 'Archivist' Lagouardette 4 years ago
parent
commit
73191a51f7
5 changed files with 362 additions and 1 deletions
  1. +19
    -1
      Makefile
  2. +157
    -0
      include/rigid_paradise/piano/function.hpp
  3. +14
    -0
      include/rigid_paradise/piano/piano.h
  4. +124
    -0
      src/libpiano/piano.cpp
  5. +48
    -0
      src/libpiano/test.cpp

+ 19
- 1
Makefile View File

@ -1,8 +1,10 @@
CXX = clang++
CXX = g++
AR = llvm-ar
CXXFLAGS = --std=c++17 -O3 -Iinclude
USE_THREADS = -pthread
USE_FILESYSTEM = -lc++fs
USE_SDL = -lSDL2
USE_PIANO = -L./build/lib/ -lpiano
all: build
@ -14,6 +16,8 @@ clean: clean_pomodoro
dirs:
@mkdir -p ./build/bin
@mkdir -p ./build/tmp
@mkdir -p ./build/lib
pomodoro: dirs
$(CXX) $(CXXFLAGS) $(USE_SDL) $(USE_THREADS) src/pomodoro/pomodoro.cpp -o ./build/bin/pomodoro_view
@ -27,6 +31,20 @@ yes: dirs
yes_clean:
-@rm ./build/bin/yes
libpiano: dirs
$(CXX) $(CXXFLAGS) -c src/libpiano/piano.cpp -o ./build/tmp/piano.o
$(AR) rc ./build/lib/libpiano.a ./build/tmp/piano.o
rm ./build/tmp/*
clean_libpiano:
-@rm ./build/tmp/piano.o ./build/lib/libpiano.a ./build/tmp/testpiano
test_piano: libpiano dirs
$(CXX) $(CXXFLAGS) $(USE_PIANO) $(USE_THREADS) src/libpiano/test.cpp -o ./build/tmp/testpiano
-./build/tmp/testpiano
-@rm ./build/tmp/*
astyle:
astyle --style=bsd --align-reference=type --align-pointer=type --break-blocks --indent-namespaces --indent=tab --add-brackets \
include/rigid_paradise/*.h \

+ 157
- 0
include/rigid_paradise/piano/function.hpp View File

@ -0,0 +1,157 @@
#pragma once
#include <functional>
#include <optional>
#include <list>
#include <mutex>
#include <memory>
#include <random>
namespace rp_piano {
class coro;
class abstract_channel;
template<typename T>
struct locked_list;
}
extern "C" {
void spawn(rp_piano::coro&&);
}
constexpr size_t queue_count = 4;
extern std::array<rp_piano::locked_list<rp_piano::coro>, queue_count> sched_queue;
namespace rp_piano {
template<typename T>
struct locked_list {
std::list<T> data;
std::mutex mtx;
std::optional<T> pop() {
std::lock_guard<std::mutex> _g(mtx);
if(data.size())
{
T ret = std::move(data.front());
data.pop_front();
return ret;
}
return std::nullopt;
}
void push(std::list<T>&& in) {
std::lock_guard<std::mutex> _g(mtx);
data.splice(std::end(data), in);
}
size_t size() {
std::lock_guard<std::mutex> _g(mtx);
return data.size();
}
};
class abstract_channel {
public:
std::shared_ptr<locked_list<coro>> waiters;
abstract_channel()
: waiters(std::make_shared<locked_list<coro>>())
{}
std::optional<coro> get_waiter();
void put_waiter(std::list<coro>);
void receive_cb();
virtual ~abstract_channel() = default;
};
template<typename T>
class channel : public abstract_channel {
locked_list<T> values;
public:
void send(T&& v) {
values.push({std::forward<T>(v)});
}
void send(std::list<T>&& v) {
values.push(std::forward<T>(v));
}
std::optional<T> receive() {
receive_cb();
return values.pop();
}
};
class coro {
using corotype = std::function<void(coro&)>;
std::list<coro> continuations;
corotype implementation = [](coro&){};
std::optional<abstract_channel> go_wait;
bool resched = false;
public:
template<typename T>
coro(T v)
: implementation(v)
{}
coro() = default;
coro& then(coro&& v)
{
continuations.emplace_back(std::move(v));
return continuations.back();
}
void operator() () {
static thread_local std::default_random_engine eng{};
if(go_wait)
{
go_wait.value().put_waiter({*this});
go_wait.reset();
return;
}
implementation(*this);
if(resched)
{
resched = false;
spawn(std::move(*this));
return;
}
if(go_wait)
{
go_wait.value().put_waiter({*this});
go_wait.reset();
} else {
sched_queue[eng() & (sched_queue.size() - 1)].push(std::move(continuations));
}
return;
}
coro& rewake_with(abstract_channel chan) {
go_wait = chan;
return *this;
}
void reschedule() {
resched = true;
}
};
inline void abstract_channel::receive_cb() {
static thread_local std::default_random_engine generator;
if(waiters)
{
while(true)
{
auto waitee = waiters->pop();
if(!waitee)
{
return;
}
sched_queue[generator() & (sched_queue.size() - 1)].push({waitee.value()});
}
}
}
}

+ 14
- 0
include/rigid_paradise/piano/piano.h View File

@ -0,0 +1,14 @@
#pragma once
#include <vector>
#include <string_view>
#include "rigid_paradise/piano/function.hpp"
extern "C" {
int piano(std::vector<std::string_view>&&);
void spawn(rp_piano::coro&&);
}
template<typename T>
rp_piano::coro start(T&& v) {
return rp_piano::coro(std::forward<T>(v));
}

+ 124
- 0
src/libpiano/piano.cpp View File

@ -0,0 +1,124 @@
#include "rigid_paradise/piano/piano.h"
#include "rigid_paradise/piano/function.hpp"
#include <thread>
#include <atomic>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
rp_piano::locked_list<rp_piano::abstract_channel> registry;
std::array<rp_piano::locked_list<rp_piano::coro>, queue_count> sched_queue;
std::atomic_bool running;
void clear_registry() {
auto sz = registry.data.size();
while( sz != 0 ){
sz--;
auto tmp = registry.pop();
if(tmp)
if(tmp.value().waiters.use_count() != 1)
registry.push( { tmp.value() } );
}
return;
}
void rp_piano::abstract_channel::put_waiter(std::list<rp_piano::coro> coros) {
waiters->push(std::move(coros));
}
std::optional<rp_piano::coro> rp_piano::abstract_channel::get_waiter() {
return waiters->pop();
}
void spawn(rp_piano::coro&& coro)
{
static thread_local std::default_random_engine eng{};
sched_queue[eng() & (sched_queue.size() - 1)].push({std::move(coro)});
}
int main(int argc, char** argv)
{
std::vector<std::string_view> args{argv, argv+argc};
std::vector<std::thread> threads;
std::vector<std::shared_ptr<std::atomic_int>> activity;
std::mutex mtx;
std::atomic_bool poll_activity = false;
running.store(true);
for(auto n = 0; n < std::thread::hardware_concurrency(); ++n)
{
activity.push_back(std::make_shared<std::atomic_int>());
threads.emplace_back([&, n](){
size_t idx;
while(running)
{
auto data = sched_queue[idx & (sched_queue.size() - 1)].pop();
if(data)
{
data.value()();
if(poll_activity)
{
activity[n]->fetch_add(1);
}
}
else
{
std::this_thread::yield();
}
++idx;
}
});
}
auto ret = piano(std::move(args));
while(running)
{
size_t cnt = 0;
for(auto& queue : sched_queue)
{
auto q = queue.size();
std::cout << q << "\t";
cnt+=q;
}
if(poll_activity)
{
bool test = false;
for(auto& a : activity)
{
test |= a->load();
}
if(!test)
{
if(!cnt)
{
running.store(false);
}
}
poll_activity.store(false);
}
else
{
if(!cnt)
{
for(auto& a : activity)
{
*a = 0;
}
poll_activity.store(true);
}
}
std::cout << "pa="<< poll_activity.load() << "\tr="<< running.load() << std::endl;
std::this_thread::sleep_for(2s);
}
for(auto& t : threads)
{
t.join();
}
return ret;
}

+ 48
- 0
src/libpiano/test.cpp View File

@ -0,0 +1,48 @@
#include "rigid_paradise/piano/piano.h"
#include <iostream>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
std::mutex cout_tex;
volatile int dummy;
void a()
{
dummy = 1;
}
int piano(std::vector<std::string_view>&& args) {
std::cout<<"Hello piano!"<<std::endl;
auto root = rp_piano::coro{
[](rp_piano::coro&){
std::lock_guard _l(cout_tex);
std::cout<<"Hello coro"<<std::endl;
}
};
root.then(
[](rp_piano::coro&){
std::lock_guard _l(cout_tex);
std::cout<<"Hello continuation"<<std::endl;
}
).then(
[](rp_piano::coro&){
std::lock_guard _l(cout_tex);
std::cout<<"Hello continuation of continuation"<<std::endl;
auto i = 100000000;
while(i>0) {
spawn(
[=](rp_piano::coro& me){
a();
me.reschedule();
}
);
--i;
}
}
);
spawn(
std::move(root)
);
return 0;
}

Loading…
Cancel
Save