completed

This commit is contained in:
chatlanin 2022-04-14 13:35:49 +03:00
parent 40399456ba
commit 49852dcac0
13 changed files with 174 additions and 160 deletions

View File

@ -33,14 +33,15 @@ int main(int argc, char *argv[])
{
hack::log()("run example");
message ms;
ms.data = { 1, 5, 7, 9, 2, 4, 6, 8 };
sorting sort;
actorfm::actor_controller::instance().set_actro(sort);
message ms { result_t{ 1, 5, 7, 9, 2, 4, 6, 8 } };
actorfm::actor_controller::instance().set_actor(sort);
auto fut = sort.expect(ms);
hack::log(": ")("final result", fut.get());
actorfm::actor_controller::instance().remove_actor(sort);
actorfm::actor_controller::instance().destroy();
hack::log()("completed");
}

View File

@ -1,6 +1,6 @@
src = [ 'main.cpp' ]
deps += actfm_dep
deps += actorfm_dep
executable(
'actorfm',

View File

@ -1,4 +1,3 @@
# https://pixorblog.wordpress.com/2019/07/27/a-meson-starter-script-for-c-projects
project(
'actorfm',
'cpp',

View File

@ -9,30 +9,32 @@
namespace actorfm
{
using task_pool = threadsafe_queue<std::unique_ptr<callable>>;
template<typename message_t, typename result_t>
class actor
{
public:
actor() = default;
virtual ~actor() {}
public:
virtual result_t invoke(message_t ms, actor* ac = nullptr) = 0;
void send(message_t ms, actor* ac = nullptr)
{}
std::future<result_t> expect(message_t ms, actor* ac = nullptr)
{
auto func = [this, ms, ac]() { this->invoke(ms, ac); }; // TODO:: make std::move(ms)
std::packaged_task<result_t()> pt = std::packaged_task<result_t()>{ func }; // TODO:: make result_t()
auto func = [this, ms, ac]() { return this->invoke(ms, ac); }; // TODO:: maybe make std::move(ms)
std::packaged_task<result_t()> pt = std::packaged_task<result_t()>{ func };
auto fut = pt.get_future();
std::unique_ptr<callable> task { new callable { std::move(fut) } };
std::unique_ptr<callable> task { new callable { std::move(pt) } };
task_queue.push(std::move(task));
return fut;
}
task_pool* get_task_queue() { return &task_queue; }
private:
threadsafe_queue<callable> task_queue;
task_pool task_queue;
};
}

View File

@ -14,11 +14,16 @@ namespace actorfm
static actor_controller& instance() { static actor_controller ac; return ac; }
template<typename actor_t>
void set_actro(actor_t& actor)
void set_actor(actor_t& actor)
{
sch->set_actor(actor);
}
template<typename actor_t>
void remove_actor(actor_t& actor) { sch->remove_actor(actor); }
void destroy() { sch->destroy(); }
private:
std::unique_ptr<scheduler> sch;
};

View File

@ -1,2 +1,4 @@
#pragma once
#include "actor_impl/actor.hpp"
#include "actor_impl/actor_controller.hpp"

View File

@ -2,11 +2,16 @@
#include <memory>
namespace actorfm
{
class callable
{
public:
template<typename func>
callable(func&& f) : impl { std::make_unique<callable_base>(new callable_impl<func>(std::move(f))) } {}
callable(func&& f)
{
impl = std::unique_ptr<callable_base>{new callable_impl<func>(std::move(f))};
}
callable(callable&& other) : impl { std::move(other.impl) } {}
callable() = default;
callable& operator=(callable&& other)
@ -27,7 +32,7 @@ class callable
struct callable_base
{
virtual void call() = 0;
virtual ~callable_base();
virtual ~callable_base() {};
};
template<typename func>
@ -38,5 +43,14 @@ class callable
func f;
};
template<typename func>
struct callable_impl<std::unique_ptr<func>> : callable_base
{
callable_impl(func f_) : f { std::move(f_) } {};
void call() override { f(); }
func f;
};
std::unique_ptr<callable_base> impl;
};
}

View File

@ -4,13 +4,13 @@ subdir('thread_pool')
deps += thread_pool_dep
lib = library(
'actfm',
'actorfm',
include_directories : inc,
dependencies : deps,
cpp_args: args
)
actfm_dep = declare_dependency(
actorfm_dep = declare_dependency(
include_directories: inc,
link_with: lib,
)

View File

@ -3,6 +3,7 @@
#include <memory>
#include "thread_pool/thread_pool.hpp"
#include "threadsafe_containers/threadsafe_list.hpp"
#include "logger/logger.hpp"
@ -11,7 +12,7 @@ namespace actorfm
class scheduler
{
public:
scheduler() : th_pool { std::make_unique<thread_pool>() }
scheduler() : th_pool { std::make_unique<thread_pool>() } , done { true }
{
auto f = [this]() { return this->schedul(); };
th_pool->submit(callable { std::move(f) });
@ -20,23 +21,38 @@ namespace actorfm
template<typename actor_t>
void set_actor(actor_t& actor)
{
hack::log()("ref semantic");
if (done)
work_list.push_front(actor.get_task_queue());
}
template<typename actor_t>
void set_actor(actor_t&& actor)
void remove_actor(actor_t& actor)
{
hack::log()("move semantic");
work_list.remove_if([&actor](threadsafe_queue<std::unique_ptr<callable>> *tasks) { return tasks == actor.get_task_queue(); });
}
void destroy() { done = false; }
private:
void schedul()
{
hack::log()("shudle");
while (done)
{
work_list.for_each([this](threadsafe_queue<std::unique_ptr<callable>> *tasks) {
while (!tasks->empty())
{
std::shared_ptr<std::unique_ptr<callable>> task = tasks->wait_and_pop();
th_pool->submit(std::move(*(*task)));
task = nullptr;
}
});
}
}
private:
std::unique_ptr<thread_pool> th_pool;
threadsafe_list<threadsafe_queue<std::unique_ptr<callable>>*> work_list;
std::atomic<bool> done;
};
}

View File

@ -1,10 +1,13 @@
#include "thread_pool.hpp"
#include "logger/logger.hpp"
namespace actorfm
{
thread_pool::thread_pool() : jn { th }
thread_pool::thread_pool() : done { false }, jn { th }
{
const auto thread_count = std::thread::hardware_concurrency();
auto thread_count = std::thread::hardware_concurrency();
thread_count = thread_count == 0 ? 2 : thread_count;
try
{
@ -13,14 +16,13 @@ namespace actorfm
}
catch (std::exception& e)
{
hack::log()(e.what());
hack::error()(e.what());
}
}
thread_pool::~thread_pool()
{
done = true;
hack::log()("thread_pool completed");
}
void thread_pool::worker()

View File

@ -6,8 +6,6 @@
#include "callable.hpp"
#include "threadsafe_containers/threadsafe_queue.hpp"
#include "logger/logger.hpp"
namespace actorfm
{
class thread_pool
@ -39,7 +37,8 @@ namespace actorfm
private:
threads& th;
};
std::atomic<bool> done { false };
std::atomic<bool> done;
threads th;
pool pl;
joiner jn;

View File

@ -4,6 +4,8 @@
#include <mutex>
#include <condition_variable>
namespace actorfm
{
template<typename data_t>
class threadsafe_list
{
@ -11,7 +13,7 @@ class threadsafe_list
threadsafe_list() = default;
~threadsafe_list()
{
remove_if([](node const &) { return true; });
};
threadsafe_list(const threadsafe_list&) = delete;
@ -21,6 +23,9 @@ class threadsafe_list
void push_front(const data_t& value)
{
std::unique_ptr<node> new_node { new node { value } };
std::lock_guard lk { head.m };
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}
template <typename func>
@ -38,23 +43,6 @@ class threadsafe_list
}
}
template <typename func>
std::shared_ptr<data_t> find_first_if(func f)
{
node *current = &head;
std::unique_lock lk { head.m };
while (node *const next = current->next.get())
{
std::unique_lock next_lk { next->m };
lk.unlock();
if (f(*next->data))
return next->data;
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<data_t>();
}
template <typename func>
void remove_if(func f)
{
@ -86,7 +74,6 @@ class threadsafe_list
std::unique_ptr<node> next;
node() : next() {}
node(const data_t& value) : data { std::make_shared<data_t>(value) } {};
} head;
};
node head;
};
}

View File

@ -18,7 +18,7 @@ namespace actorfm
public:
std::shared_ptr<data_t> try_pop()
{
std::unique_ptr<data_t> old_head = try_pop_head();
std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<data_t>();
}
@ -26,9 +26,7 @@ namespace actorfm
{
std::unique_ptr<node> old_head = try_pop_head(value);
if (old_head)
{
return true;
}
return false;
}
@ -38,19 +36,14 @@ namespace actorfm
return old_head->data;
}
void wait_and_pop(data_t& value)
{
std::unique_ptr<node> old_head = wait_pop_head(value);
}
void push(data_t new_value)
{
std::shared_ptr<data_t> new_data { std::make_shared<data_t>(std::move(new_value)) };
std::unique_ptr<node> p { new node };
{
std::lock_guard tail_lock { tail_mutex };
tail->data = new_data;
node* const new_tail = p.get();
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
@ -60,7 +53,7 @@ namespace actorfm
bool empty()
{
std::lock_guard head_lock { head_mutex };
return (head == get_tail());
return head.get() == get_tail();
}
private:
@ -73,18 +66,11 @@ namespace actorfm
std::condition_variable cv;
std::mutex head_mutex;
std::mutex tail_mutex;
std::unique_ptr<node> head;
node* tail;
private:
std::unique_ptr<node> try_pop_head()
{
std::lock_guard head_lock { head_mutex };
if(head.get() == get_tail())
return std::unique_ptr<node>();
return pop_head();
}
std::unique_ptr<node> try_pop_head(data_t& value)
{
std::lock_guard head_lock { head_mutex };
@ -101,10 +87,18 @@ namespace actorfm
return old_head;
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard head_lock { head_mutex };
if(head.get() == get_tail())
return std::unique_ptr<node>();
return pop_head();
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock head_lock { head_mutex };
cv.wait(head_lock, [&] { return head != get_tail(); });
cv.wait(head_lock, [&] { return head.get() != get_tail(); });
return head_lock;
}
@ -114,13 +108,6 @@ namespace actorfm
return pop_head();
}
std::unique_ptr<node> wait_pop_head(data_t& value)
{
std::unique_lock head_lock { wait_for_data() };
value = std::move(*head->data);
return pop_head();
}
node* get_tail()
{
std::lock_guard tail_lock { tail_mutex };