2 #include <ossia/dataflow/graph_node.hpp>
3 #include <ossia/detail/fmt.hpp>
4 #include <ossia/detail/lockfree_queue.hpp>
5 #include <ossia/detail/thread.hpp>
7 #include <boost/container/static_vector.hpp>
9 #include <blockingconcurrentqueue.h>
10 #include <concurrentqueue.h>
11 #include <smallfun.hpp>
15 #define DISABLE_DONE_TASKS
23 using task_function = smallfun::function<void(ossia::graph_node&),
sizeof(
void*) * 4>;
31 task(
const task&) =
delete;
32 task(task&& other) noexcept
33 : m_taskId{other.m_taskId}
34 , m_dependencies{other.m_dependencies}
35 , m_remaining_dependencies{other.m_remaining_dependencies.load()}
36 , m_node{other.m_node}
37 , m_precedes{std::move(other.m_precedes)}
38 #if defined(CHECK_FOLLOWS)
39 , m_follows{std::move(other.m_follows)}
42 other.m_precedes.clear();
43 #if defined(CHECK_FOLLOWS)
44 other.m_follows.clear();
47 task& operator=(
const task&) =
delete;
48 task& operator=(task&& other) noexcept
50 m_taskId = other.m_taskId;
51 m_dependencies = other.m_dependencies;
52 m_remaining_dependencies = other.m_remaining_dependencies.load();
53 m_node = other.m_node;
54 m_precedes = std::move(other.m_precedes);
55 other.m_precedes.clear();
56 #if defined(CHECK_FOLLOWS)
57 m_follows = std::move(other.m_follows);
58 other.m_follows.clear();
63 task(ossia::graph_node& node)
68 void precede(task& other)
70 m_precedes.push_back(other.m_taskId);
71 #if defined(CHECK_FOLLOWS)
72 other.m_follows.push_back(m_taskId);
74 other.m_dependencies++;
78 friend class taskflow;
79 friend class executor;
82 int m_dependencies{0};
83 std::atomic_int m_remaining_dependencies{};
84 std::atomic_bool m_executed{};
86 ossia::graph_node* m_node{};
87 ossia::small_pod_vector<int, 4> m_precedes;
88 #if defined(CHECK_FOLLOWS)
89 ossia::small_pod_vector<int, 4> m_follows;
96 void clear() { m_tasks.clear(); }
98 void reserve(std::size_t sz) { m_tasks.reserve(sz); }
100 task* emplace(ossia::graph_node& node)
102 const int taskId = m_tasks.size();
103 auto& last = m_tasks.emplace_back(node);
104 last.m_taskId = taskId;
109 friend class executor;
111 std::vector<task> m_tasks;
117 explicit executor(
int nthreads)
120 m_threads.resize(nthreads);
122 for(
auto& t : m_threads)
124 t = std::thread{[
this, k = k++] {
125 while(!m_startFlag.test())
126 std::this_thread::yield();
128 ossia::set_thread_name(m_threads[k],
"ossia exec " + std::to_string(k));
129 ossia::set_thread_realtime(m_threads[k], 95);
130 ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
135 if(m_tasks.wait_dequeue_timed(t, 100))
143 m_startFlag.test_and_set();
149 for(
auto& t : m_threads)
155 void set_task_executor(task_function f) { m_func = std::move(f); }
157 void run(taskflow& tf)
160 if(tf.m_tasks.empty())
165 m_toDoTasks = tf.m_tasks.size();
166 m_doneTasks.store(0, std::memory_order_relaxed);
168 for(
auto& task : tf.m_tasks)
170 task.m_remaining_dependencies.store(
171 task.m_dependencies, std::memory_order_relaxed);
172 task.m_executed.store(
false, std::memory_order_relaxed);
173 #if defined(CHECK_EXEC_COUNTS)
174 m_checkVec[task.m_taskId] = 0;
178 std::atomic_thread_fence(std::memory_order_seq_cst);
179 #if defined(DISABLE_DONE_TASKS)
180 thread_local ossia::small_pod_vector<ossia::task*, 8> toCleanup;
182 for(
auto& task : tf.m_tasks)
184 if(task.m_dependencies == 0)
186 #if defined(DISABLE_DONE_TASKS)
187 if(task.m_node->enabled())
190 #if defined(CHECK_EXEC_COUNTS)
191 m_checkVec[task.m_taskId]++;
192 if(m_checkVec[task.m_taskId] != 1)
195 stderr,
"!!! task {} enqueued {}\n", task.m_taskId,
196 m_checkVec[task.m_taskId]);
199 assert(task.m_dependencies == 0);
201 std::atomic_thread_fence(std::memory_order_release);
202 m_tasks.enqueue(&task);
204 #if defined(DISABLE_DONE_TASKS)
207 toCleanup.push_back(&task);
213 #if defined(DISABLE_DONE_TASKS)
214 for(
auto& task : toCleanup)
221 while(m_doneTasks.load(std::memory_order_relaxed) != m_toDoTasks)
224 if(m_tasks.wait_dequeue_timed(t, 1))
230 std::atomic_thread_fence(std::memory_order_seq_cst);
234 void process_done(ossia::task& task)
236 if(task.m_executed.exchange(
true))
238 assert(this->m_doneTasks != m_tf->m_tasks.size());
240 #if defined(DISABLE_DONE_TASKS)
241 ossia::small_pod_vector<ossia::task*, 8> toCleanup;
243 for(
int taskId : task.m_precedes)
245 auto& nextTask = m_tf->m_tasks[taskId];
246 assert(!nextTask.m_executed);
248 std::atomic_int& remaining = nextTask.m_remaining_dependencies;
249 assert(remaining > 0);
250 const int rem = remaining.fetch_sub(1, std::memory_order_relaxed) - 1;
254 #if defined(DISABLE_DONE_TASKS)
255 if(nextTask.m_node->enabled())
258 #if defined(CHECK_EXEC_COUNTS)
259 m_checkVec[nextTask.m_taskId]++;
260 if(m_checkVec[nextTask.m_taskId] != 1)
263 stderr,
"!!! task {} enqueued {}\n", nextTask.m_taskId,
264 m_checkVec[nextTask.m_taskId]);
268 std::atomic_thread_fence(std::memory_order_release);
269 m_tasks.enqueue(&nextTask);
271 #if defined(DISABLE_DONE_TASKS)
274 toCleanup.push_back(&nextTask);
280 #if defined(DISABLE_DONE_TASKS)
281 for(
auto& clean : toCleanup)
283 process_done(*clean);
287 this->m_doneTasks.fetch_add(1, std::memory_order_relaxed);
290 void execute(task& task)
292 std::atomic_thread_fence(std::memory_order_acquire);
295 assert(!task.m_executed);
296 #if defined(CHECK_FOLLOWS)
297 for(
auto& prev : task.m_follows)
299 auto& t = m_tf->m_tasks[prev];
300 assert(t.m_executed);
304 #if defined(CHECK_EXEC_COUNTS)
305 assert(m_checkVec[task.m_taskId] == 1);
307 m_func(*task.m_node);
309 #if defined(CHECK_EXEC_COUNTS)
310 assert(m_checkVec[task.m_taskId] == 1);
315 fmt::print(stderr,
"error !\n");
317 std::atomic_thread_fence(std::memory_order_release);
320 #if defined(CHECK_EXEC_COUNTS)
321 assert(m_checkVec[task.m_taskId] == 1);
324 std::atomic_thread_fence(std::memory_order_release);
327 task_function m_func;
329 std::atomic_bool m_running{};
331 ossia::small_vector<std::thread, 8> m_threads;
332 std::atomic_flag m_startFlag = ATOMIC_FLAG_INIT;
335 std::atomic_size_t m_doneTasks = 0;
336 std::size_t m_toDoTasks = 0;
338 moodycamel::BlockingConcurrentQueue<task*> m_tasks;
340 #if defined(CHECK_EXEC_COUNTS)
341 std::array<std::atomic_int, 5000> m_checkVec;
346 #include <ossia/dataflow/graph/graph_static.hpp>
347 #include <ossia/detail/hash_map.hpp>
350 struct custom_parallel_exec;
351 template <
typename Impl>
352 struct custom_parallel_update
355 std::shared_ptr<ossia::logger_type>
logger;
356 std::shared_ptr<bench_map> perf_map;
358 template <
typename Graph_T>
359 custom_parallel_update(Graph_T& g,
const ossia::graph_setup_options& opt)
361 , executor{opt.parallel_threads}
366 ossia::node_map& nodes,
const std::vector<graph_node*>& topo_order,
367 ossia::graph_t& graph)
372 flow_graph.reserve(nodes.size());
378 executor.set_task_executor(
379 node_exec_logger_bench{cur_state, *perf_map, *
logger});
380 for(
auto node : topo_order)
382 (*perf_map)[node] = std::nullopt;
383 flow_nodes[node] = flow_graph.emplace(*node);
388 executor.set_task_executor(node_exec_logger{cur_state, *
logger});
389 for(
auto node : topo_order)
391 flow_nodes[node] = flow_graph.emplace(*node);
397 executor.set_task_executor(node_exec{cur_state});
398 for(
auto node : topo_order)
400 flow_nodes[node] = flow_graph.emplace(*node);
404 for(
auto [ei, ei_end] = boost::edges(graph); ei != ei_end; ++ei)
407 auto& n1 = graph[edge.m_source];
408 auto& n2 = graph[edge.m_target];
410 auto& sender = flow_nodes[n2.get()];
411 auto& receiver = flow_nodes[n1.get()];
412 sender->precede(*receiver);
416 template <
typename Graph_T,
typename DevicesT>
417 void operator()(Graph_T& g,
const DevicesT& devices)
420 update_graph(g.m_nodes, g.m_all_nodes, impl.m_sub_graph);
424 friend struct custom_parallel_exec;
427 execution_state* cur_state{};
429 ossia::taskflow flow_graph;
430 ossia::executor executor;
431 ossia::hash_map<graph_node*, ossia::task*> flow_nodes;
434 struct custom_parallel_exec
436 template <
typename Graph_T>
437 custom_parallel_exec(Graph_T&)
441 template <
typename Graph_T,
typename Impl>
443 Graph_T& g, custom_parallel_update<Impl>&
self, ossia::execution_state& e,
444 const std::vector<ossia::graph_node*>&)
447 self.executor.run(
self.flow_graph);
451 using custom_parallel_tc_graph
452 = graph_static<custom_parallel_update<tc_update<fast_tc>>, custom_parallel_exec>;
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition: context.cpp:104