diff --git a/bin/main.cpp b/bin/main.cpp index d0aec31..b557633 100644 --- a/bin/main.cpp +++ b/bin/main.cpp @@ -33,14 +33,15 @@ int main(int argc, char *argv[]) { hack::log()("run example"); - message ms; - ms.data = { 1, 5, 7, 9, 2, 4, 6, 8 }; - sorting sort; - actorfm::actor_controller::instance().set_actro(sort); + message ms { result_t{ 1, 5, 7, 9, 2, 4, 6, 8 } }; + actorfm::actor_controller::instance().set_actor(sort); auto fut = sort.expect(ms); hack::log(": ")("final result", fut.get()); + actorfm::actor_controller::instance().remove_actor(sort); + actorfm::actor_controller::instance().destroy(); + hack::log()("completed"); } diff --git a/bin/meson.build b/bin/meson.build index 64c8ac4..b3f4067 100644 --- a/bin/meson.build +++ b/bin/meson.build @@ -1,6 +1,6 @@ src = [ 'main.cpp' ] -deps += actfm_dep +deps += actorfm_dep executable( 'actorfm', diff --git a/meson.build b/meson.build index 2bb4e1f..5e40aa3 100644 --- a/meson.build +++ b/meson.build @@ -1,4 +1,3 @@ -# https://pixorblog.wordpress.com/2019/07/27/a-meson-starter-script-for-c-projects project( 'actorfm', 'cpp', diff --git a/src/actor_impl/actor.hpp b/src/actor_impl/actor.hpp index 82acc85..b7fbb07 100644 --- a/src/actor_impl/actor.hpp +++ b/src/actor_impl/actor.hpp @@ -9,30 +9,32 @@ namespace actorfm { + using task_pool = threadsafe_queue>; + template class actor { public: + actor() = default; virtual ~actor() {} public: virtual result_t invoke(message_t ms, actor* ac = nullptr) = 0; - void send(message_t ms, actor* ac = nullptr) - {} - std::future expect(message_t ms, actor* ac = nullptr) { - auto func = [this, ms, ac]() { this->invoke(ms, ac); }; // TODO:: make std::move(ms) - std::packaged_task pt = std::packaged_task{ func }; // TODO:: make result_t() - auto fut = pt.get_future(); - std::unique_ptr task { new callable { std::move(fut) } }; + auto func = [this, ms, ac]() { return this->invoke(ms, ac); }; // TODO:: maybe make std::move(ms) + std::packaged_task pt = std::packaged_task{ func }; + auto fut = pt.get_future(); + std::unique_ptr task { new callable { std::move(pt) } }; task_queue.push(std::move(task)); return fut; } + task_pool* get_task_queue() { return &task_queue; } + private: - threadsafe_queue task_queue; + task_pool task_queue; }; } diff --git a/src/actor_impl/actor_controller.hpp b/src/actor_impl/actor_controller.hpp index f73bd4a..abb0320 100644 --- a/src/actor_impl/actor_controller.hpp +++ b/src/actor_impl/actor_controller.hpp @@ -14,11 +14,16 @@ namespace actorfm static actor_controller& instance() { static actor_controller ac; return ac; } template - void set_actro(actor_t& actor) + void set_actor(actor_t& actor) { sch->set_actor(actor); } + template + void remove_actor(actor_t& actor) { sch->remove_actor(actor); } + + void destroy() { sch->destroy(); } + private: std::unique_ptr sch; }; diff --git a/src/actorfm.hpp b/src/actorfm.hpp index fb852e9..a9cf400 100644 --- a/src/actorfm.hpp +++ b/src/actorfm.hpp @@ -1,2 +1,4 @@ +#pragma once + #include "actor_impl/actor.hpp" #include "actor_impl/actor_controller.hpp" diff --git a/src/callable.hpp b/src/callable.hpp index f3b4fa9..851f9f6 100644 --- a/src/callable.hpp +++ b/src/callable.hpp @@ -2,41 +2,55 @@ #include -class callable +namespace actorfm { - public: - template - callable(func&& f) : impl { std::make_unique(new callable_impl(std::move(f))) } {} - callable(callable&& other) : impl { std::move(other.impl) } {} - callable() = default; - callable& operator=(callable&& other) - { - impl = std::move(other.impl); - return *this; - } + class callable + { + public: + template + callable(func&& f) + { + impl = std::unique_ptr{new callable_impl(std::move(f))}; + } + callable(callable&& other) : impl { std::move(other.impl) } {} + callable() = default; + callable& operator=(callable&& other) + { + impl = std::move(other.impl); + return *this; + } - callable(const callable&) = delete; - callable(callable&) = delete; - callable& operator=(const callable&) = delete; + callable(const callable&) = delete; + callable(callable&) = delete; + callable& operator=(const callable&) = delete; - public: - void operator()() { impl->call(); } + public: + void operator()() { impl->call(); } - private: - struct callable_base - { - virtual void call() = 0; - virtual ~callable_base(); - }; + private: + struct callable_base + { + virtual void call() = 0; + virtual ~callable_base() {}; + }; - template - struct callable_impl : callable_base - { - callable_impl(func&& f_) : f { std::move(f_) } {}; - void call() override { f(); } - func f; - }; + template + struct callable_impl : callable_base + { + callable_impl(func&& f_) : f { std::move(f_) } {}; + void call() override { f(); } + func f; + }; - std::unique_ptr impl; -}; + template + struct callable_impl> : callable_base + { + callable_impl(func f_) : f { std::move(f_) } {}; + void call() override { f(); } + func f; + }; + + std::unique_ptr impl; + }; +} diff --git a/src/meson.build b/src/meson.build index 7db122e..e0eb3bc 100644 --- a/src/meson.build +++ b/src/meson.build @@ -4,13 +4,13 @@ subdir('thread_pool') deps += thread_pool_dep lib = library( - 'actfm', + 'actorfm', include_directories : inc, dependencies : deps, cpp_args: args ) -actfm_dep = declare_dependency( +actorfm_dep = declare_dependency( include_directories: inc, link_with: lib, ) diff --git a/src/scheduler.hpp b/src/scheduler.hpp index b375b79..dd6602d 100644 --- a/src/scheduler.hpp +++ b/src/scheduler.hpp @@ -3,6 +3,7 @@ #include #include "thread_pool/thread_pool.hpp" +#include "threadsafe_containers/threadsafe_list.hpp" #include "logger/logger.hpp" @@ -11,7 +12,7 @@ namespace actorfm class scheduler { public: - scheduler() : th_pool { std::make_unique() } + scheduler() : th_pool { std::make_unique() } , done { true } { auto f = [this]() { return this->schedul(); }; th_pool->submit(callable { std::move(f) }); @@ -20,23 +21,38 @@ namespace actorfm template void set_actor(actor_t& actor) { - hack::log()("ref semantic"); + if (done) + work_list.push_front(actor.get_task_queue()); } template - void set_actor(actor_t&& actor) - { - hack::log()("move semantic"); + void remove_actor(actor_t& actor) + { + work_list.remove_if([&actor](threadsafe_queue> *tasks) { return tasks == actor.get_task_queue(); }); } + void destroy() { done = false; } + private: void schedul() { - hack::log()("shudle"); + while (done) + { + work_list.for_each([this](threadsafe_queue> *tasks) { + while (!tasks->empty()) + { + std::shared_ptr> task = tasks->wait_and_pop(); + th_pool->submit(std::move(*(*task))); + task = nullptr; + } + }); + } } private: std::unique_ptr th_pool; + threadsafe_list>*> work_list; + std::atomic done; }; } diff --git a/src/thread_pool/thread_pool.cpp b/src/thread_pool/thread_pool.cpp index ccdb048..f3ad2aa 100644 --- a/src/thread_pool/thread_pool.cpp +++ b/src/thread_pool/thread_pool.cpp @@ -1,10 +1,13 @@ #include "thread_pool.hpp" +#include "logger/logger.hpp" + namespace actorfm { - thread_pool::thread_pool() : jn { th } + thread_pool::thread_pool() : done { false }, jn { th } { - const auto thread_count = std::thread::hardware_concurrency(); + auto thread_count = std::thread::hardware_concurrency(); + thread_count = thread_count == 0 ? 2 : thread_count; try { @@ -13,14 +16,13 @@ namespace actorfm } catch (std::exception& e) { - hack::log()(e.what()); + hack::error()(e.what()); } } thread_pool::~thread_pool() { done = true; - hack::log()("thread_pool completed"); } void thread_pool::worker() diff --git a/src/thread_pool/thread_pool.hpp b/src/thread_pool/thread_pool.hpp index c70bf59..a20786e 100644 --- a/src/thread_pool/thread_pool.hpp +++ b/src/thread_pool/thread_pool.hpp @@ -6,8 +6,6 @@ #include "callable.hpp" #include "threadsafe_containers/threadsafe_queue.hpp" -#include "logger/logger.hpp" - namespace actorfm { class thread_pool @@ -39,7 +37,8 @@ namespace actorfm private: threads& th; }; - std::atomic done { false }; + + std::atomic done; threads th; pool pl; joiner jn; diff --git a/src/threadsafe_containers/threadsafe_list.hpp b/src/threadsafe_containers/threadsafe_list.hpp index e294d01..cb415b7 100644 --- a/src/threadsafe_containers/threadsafe_list.hpp +++ b/src/threadsafe_containers/threadsafe_list.hpp @@ -4,89 +4,76 @@ #include #include -template -class threadsafe_list +namespace actorfm { - public: - threadsafe_list() = default; - ~threadsafe_list() - { - - }; - - threadsafe_list(const threadsafe_list&) = delete; - threadsafe_list& operator=(const threadsafe_list&) = delete; - - public: - void push_front(const data_t& value) - { - std::unique_ptr new_node { new node { value } }; - } - - template - void for_each(func f) - { - node *current = &head; - std::unique_lock lk { head.m }; - while (node* const next = current->next.get()) + template + class threadsafe_list + { + public: + threadsafe_list() = default; + ~threadsafe_list() { - std::unique_lock next_lk { next->m }; - lk.unlock(); - f(*next->data); - current = next; - lk = std::move(next_lk); + remove_if([](node const &) { return true; }); + }; + + threadsafe_list(const threadsafe_list&) = delete; + threadsafe_list& operator=(const threadsafe_list&) = delete; + + public: + void push_front(const data_t& value) + { + std::unique_ptr new_node { new node { value } }; + std::lock_guard lk { head.m }; + new_node->next = std::move(head.next); + head.next = std::move(new_node); } - } - template - std::shared_ptr find_first_if(func f) - { - node *current = &head; - std::unique_lock lk { head.m }; - while (node *const next = current->next.get()) + template + void for_each(func f) { - std::unique_lock next_lk { next->m }; - lk.unlock(); - if (f(*next->data)) - return next->data; - current = next; - lk = std::move(next_lk); - } - return std::shared_ptr(); - } - - template - void remove_if(func f) - { - node *current = &head; - std::unique_lock lk { head.m }; - while (node *const next = current->next.get()) - { - std::unique_lock next_lk { next->m }; - if (f(*next->data)) - { - std::unique_ptr old_next = std::move(current->next); - current->next = std::move(next->next); - next_lk.unlock(); - } - else + node *current = &head; + std::unique_lock lk { head.m }; + while (node* const next = current->next.get()) { + std::unique_lock next_lk { next->m }; lk.unlock(); + f(*next->data); current = next; lk = std::move(next_lk); } } - } - private: - struct node - { - std::mutex m; - std::shared_ptr data; - std::unique_ptr next; - node() : next() {} - node(const data_t& value) : data { std::make_shared(value) } {}; - }; + template + void remove_if(func f) + { + node *current = &head; + std::unique_lock lk { head.m }; + while (node *const next = current->next.get()) + { + std::unique_lock next_lk { next->m }; + if (f(*next->data)) + { + std::unique_ptr old_next = std::move(current->next); + current->next = std::move(next->next); + next_lk.unlock(); + } + else + { + lk.unlock(); + current = next; + lk = std::move(next_lk); + } + } + } - node head; -}; + private: + struct node + { + std::mutex m; + std::shared_ptr data; + std::unique_ptr next; + node() : next() {} + node(const data_t& value) : data { std::make_shared(value) } {}; + } head; + }; +} diff --git a/src/threadsafe_containers/threadsafe_queue.hpp b/src/threadsafe_containers/threadsafe_queue.hpp index c99a61d..6272a3a 100644 --- a/src/threadsafe_containers/threadsafe_queue.hpp +++ b/src/threadsafe_containers/threadsafe_queue.hpp @@ -18,7 +18,7 @@ namespace actorfm public: std::shared_ptr try_pop() { - std::unique_ptr old_head = try_pop_head(); + std::unique_ptr old_head = try_pop_head(); return old_head ? old_head->data : std::shared_ptr(); } @@ -26,9 +26,7 @@ namespace actorfm { std::unique_ptr old_head = try_pop_head(value); if (old_head) - { return true; - } return false; } @@ -38,19 +36,14 @@ namespace actorfm return old_head->data; } - void wait_and_pop(data_t& value) - { - std::unique_ptr old_head = wait_pop_head(value); - } - void push(data_t new_value) { std::shared_ptr new_data { std::make_shared(std::move(new_value)) }; std::unique_ptr p { new node }; { std::lock_guard tail_lock { tail_mutex }; - tail->data = new_data; node* const new_tail = p.get(); + tail->data = new_data; tail->next = std::move(p); tail = new_tail; } @@ -60,7 +53,7 @@ namespace actorfm bool empty() { std::lock_guard head_lock { head_mutex }; - return (head == get_tail()); + return head.get() == get_tail(); } private: @@ -73,18 +66,11 @@ namespace actorfm std::condition_variable cv; std::mutex head_mutex; std::mutex tail_mutex; + std::unique_ptr head; node* tail; private: - std::unique_ptr try_pop_head() - { - std::lock_guard head_lock { head_mutex }; - if(head.get() == get_tail()) - return std::unique_ptr(); - return pop_head(); - } - std::unique_ptr try_pop_head(data_t& value) { std::lock_guard head_lock { head_mutex }; @@ -101,10 +87,18 @@ namespace actorfm return old_head; } + std::unique_ptr try_pop_head() + { + std::lock_guard head_lock { head_mutex }; + if(head.get() == get_tail()) + return std::unique_ptr(); + return pop_head(); + } + std::unique_lock wait_for_data() { std::unique_lock head_lock { head_mutex }; - cv.wait(head_lock, [&] { return head != get_tail(); }); + cv.wait(head_lock, [&] { return head.get() != get_tail(); }); return head_lock; } @@ -114,13 +108,6 @@ namespace actorfm return pop_head(); } - std::unique_ptr wait_pop_head(data_t& value) - { - std::unique_lock head_lock { wait_for_data() }; - value = std::move(*head->data); - return pop_head(); - } - node* get_tail() { std::lock_guard tail_lock { tail_mutex };