add callable

This commit is contained in:
chatlanin 2022-04-13 16:12:11 +03:00
parent 245a610d26
commit 40399456ba
8 changed files with 128 additions and 33 deletions

View File

@ -12,18 +12,20 @@ struct message
class sorting : public actorfm::actor<message, result_t>
{
public:
result_t set_message(message ms, actorfm::actor<message, result_t>* ac = nullptr) override
result_t invoke(message ms, actorfm::actor<message, result_t>* ac = nullptr) override
{
result_t res;
std::sort(ms.data.begin(), ms.data.end());
if (ac)
{}
else
{
std::sort(ms.data.begin(), ms.data.end());
std::future<result_t> fut = ac->expect(ms);
res = fut.get();
}
return res;
else
res = ms.data;
return res;
}
};
@ -35,9 +37,10 @@ int main(int argc, char *argv[])
ms.data = { 1, 5, 7, 9, 2, 4, 6, 8 };
sorting sort;
sort.set_message(ms);
actorfm::actor_controller::instance().set_actro(sort);
auto fut = sort.expect(ms);
hack::log(": ")("final result", fut.get());
hack::log()("completed");
}

View File

@ -2,6 +2,10 @@
#include <vector>
#include <thread>
#include <future>
#include "threadsafe_containers/threadsafe_queue.hpp"
#include "callable.hpp"
namespace actorfm
{
@ -12,13 +16,23 @@ namespace actorfm
virtual ~actor() {}
public:
virtual result_t set_message(message_t ms, actor* ac = nullptr) = 0;
virtual result_t invoke(message_t ms, actor* ac = nullptr) = 0;
void send(message_t ms, actor* ac = nullptr)
{}
result_t expect(message_t ms, actor* ac = nullptr)
{}
std::future<result_t> expect(message_t ms, actor* ac = nullptr)
{
auto func = [this, ms, ac]() { this->invoke(ms, ac); }; // TODO:: make std::move(ms)
std::packaged_task<result_t()> pt = std::packaged_task<result_t()>{ func }; // TODO:: make result_t()
auto fut = pt.get_future();
std::unique_ptr<callable> task { new callable { std::move(fut) } };
task_queue.push(std::move(task));
return fut;
}
private:
threadsafe_queue<callable> task_queue;
};
}

42
src/callable.hpp Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include <memory>
class callable
{
public:
template<typename func>
callable(func&& f) : impl { std::make_unique<callable_base>(new callable_impl<func>(std::move(f))) } {}
callable(callable&& other) : impl { std::move(other.impl) } {}
callable() = default;
callable& operator=(callable&& other)
{
impl = std::move(other.impl);
return *this;
}
callable(const callable&) = delete;
callable(callable&) = delete;
callable& operator=(const callable&) = delete;
public:
void operator()() { impl->call(); }
private:
struct callable_base
{
virtual void call() = 0;
virtual ~callable_base();
};
template<typename func>
struct callable_impl : callable_base
{
callable_impl(func&& f_) : f { std::move(f_) } {};
void call() override { f(); }
func f;
};
std::unique_ptr<callable_base> impl;
};

View File

@ -3,9 +3,6 @@ inc += include_directories('.')
subdir('thread_pool')
deps += thread_pool_dep
subdir('threadsafe_queue')
deps += threadsafe_queue_dep
lib = library(
'actfm',
include_directories : inc,

View File

@ -13,8 +13,8 @@ namespace actorfm
public:
scheduler() : th_pool { std::make_unique<thread_pool>() }
{
auto f = [this]() { return this->schedul(); };
th_pool->submit(callable { std::move(f) });
}
template<typename actor_t>
@ -29,6 +29,12 @@ namespace actorfm
hack::log()("move semantic");
}
private:
void schedul()
{
hack::log()("shudle");
}
private:
std::unique_ptr<thread_pool> th_pool;
};

View File

@ -2,14 +2,14 @@
namespace actorfm
{
thread_pool::thread_pool()
thread_pool::thread_pool() : jn { th }
{
const auto thread_count = std::thread::hardware_concurrency();
try
{
for (auto i = 0u; i < thread_count; ++i)
pool.emplace_back(std::thread(&thread_pool::worker, this));
th.emplace_back(std::thread(&thread_pool::worker, this));
}
catch (std::exception& e)
{
@ -27,7 +27,14 @@ namespace actorfm
{
while (!done)
{
hack::log()("run thread");
callable task;
if (pl.try_pop(task)) task();
else std::this_thread::yield();
}
};
}
void thread_pool::submit(callable task)
{
pl.push(std::move(task));
}
}

View File

@ -3,23 +3,45 @@
#include <vector>
#include <thread>
#include "callable.hpp"
#include "threadsafe_containers/threadsafe_queue.hpp"
#include "logger/logger.hpp"
namespace actorfm
{
class thread_pool
{
using Pool = std::vector<std::thread>;
using threads = std::vector<std::thread>;
using pool = threadsafe_queue<callable>;
public:
thread_pool();
~thread_pool();
private:
void worker();
public:
void submit(callable);
private:
Pool pool;
bool done { false };
private:
void worker();
private:
class joiner
{
public:
explicit joiner(threads& th_) : th { th_ } {}
~joiner()
{
for (auto& t : th)
if (t.joinable()) t.join();
}
private:
threads& th;
};
std::atomic<bool> done { false };
threads th;
pool pl;
joiner jn;
};
}

View File

@ -18,25 +18,29 @@ namespace actorfm
public:
std::shared_ptr<data_t> try_pop()
{
std::unique_ptr<data_t> const old_head = try_pop_head();
return old_head ? old_head->data:std::shared_ptr<data_t>();
std::unique_ptr<data_t> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<data_t>();
}
bool try_pop(data_t& value)
{
std::unique_ptr<node> const old_head = try_pop_head(value);
return old_head;
std::unique_ptr<node> old_head = try_pop_head(value);
if (old_head)
{
return true;
}
return false;
}
std::shared_ptr<data_t> wait_and_pop()
{
std::unique_ptr<node> const old_head = wait_pop_head();
std::unique_ptr<node> old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(data_t& value)
{
std::unique_ptr<node> const old_head = wait_pop_head(value);
std::unique_ptr<node> old_head = wait_pop_head(value);
}
void push(data_t new_value)
@ -45,7 +49,7 @@ namespace actorfm
std::unique_ptr<node> p { new node };
{
std::lock_guard tail_lock { tail_mutex };
tail->data=new_data;
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
@ -92,7 +96,7 @@ namespace actorfm
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> const old_head = std::move(head);
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}