OSSIA
Open Scenario System for Interactive Application
graph_parallel_impl.hpp
1 #pragma once
2 #include <ossia/dataflow/graph_node.hpp>
3 #include <ossia/detail/fmt.hpp>
4 #include <ossia/detail/lockfree_queue.hpp>
5 #include <ossia/detail/thread.hpp>
6 
7 #include <boost/container/static_vector.hpp>
8 
9 #include <blockingconcurrentqueue.h>
10 #include <concurrentqueue.h>
11 #include <smallfun.hpp>
12 
13 #include <thread>
14 #include <vector>
15 #define DISABLE_DONE_TASKS
16 //#define CHECK_FOLLOWS
17 //#define CHECK_EXEC_COUNTS
18 //#define memory_order_relaxed memory_order_seq_cst
19 //#define memory_order_acquire memory_order_seq_cst
20 //#define memory_order_release memory_order_seq_cst
21 namespace ossia
22 {
23 using task_function = smallfun::function<void(ossia::graph_node&), sizeof(void*) * 4>;
24 
25 class taskflow;
26 class executor;
27 class task
28 {
29 public:
30  task() = default;
31  task(const task&) = delete;
32  task(task&& other) noexcept
33  : m_taskId{other.m_taskId}
34  , m_dependencies{other.m_dependencies}
35  , m_remaining_dependencies{other.m_remaining_dependencies.load()}
36  , m_node{other.m_node}
37  , m_precedes{std::move(other.m_precedes)}
38 #if defined(CHECK_FOLLOWS)
39  , m_follows{std::move(other.m_follows)}
40 #endif
41  {
42  other.m_precedes.clear();
43 #if defined(CHECK_FOLLOWS)
44  other.m_follows.clear();
45 #endif
46  }
47  task& operator=(const task&) = delete;
48  task& operator=(task&& other) noexcept
49  {
50  m_taskId = other.m_taskId;
51  m_dependencies = other.m_dependencies;
52  m_remaining_dependencies = other.m_remaining_dependencies.load();
53  m_node = other.m_node;
54  m_precedes = std::move(other.m_precedes);
55  other.m_precedes.clear();
56 #if defined(CHECK_FOLLOWS)
57  m_follows = std::move(other.m_follows);
58  other.m_follows.clear();
59 #endif
60  return *this;
61  }
62 
63  task(ossia::graph_node& node)
64  : m_node{&node}
65  {
66  }
67 
68  void precede(task& other)
69  {
70  m_precedes.push_back(other.m_taskId);
71 #if defined(CHECK_FOLLOWS)
72  other.m_follows.push_back(m_taskId);
73 #endif
74  other.m_dependencies++;
75  }
76 
77 private:
78  friend class taskflow;
79  friend class executor;
80 
81  int m_taskId{};
82  int m_dependencies{0};
83  std::atomic_int m_remaining_dependencies{};
84  std::atomic_bool m_executed{};
85 
86  ossia::graph_node* m_node{};
87  ossia::small_pod_vector<int, 4> m_precedes;
88 #if defined(CHECK_FOLLOWS)
89  ossia::small_pod_vector<int, 4> m_follows;
90 #endif
91 };
92 
93 class taskflow
94 {
95 public:
96  void clear() { m_tasks.clear(); }
97 
98  void reserve(std::size_t sz) { m_tasks.reserve(sz); }
99 
100  task* emplace(ossia::graph_node& node)
101  {
102  const int taskId = m_tasks.size();
103  auto& last = m_tasks.emplace_back(node);
104  last.m_taskId = taskId;
105  return &last;
106  }
107 
108 private:
109  friend class executor;
110 
111  std::vector<task> m_tasks;
112 };
113 
114 class executor
115 {
116 public:
117  explicit executor(int nthreads)
118  {
119  m_running = true;
120  m_threads.resize(nthreads);
121  int k = 0;
122  for(auto& t : m_threads)
123  {
124  t = std::thread{[this, k = k++] {
125  while(!m_startFlag.test())
126  std::this_thread::yield();
127 
128  ossia::set_thread_name(m_threads[k], "ossia exec " + std::to_string(k));
129  ossia::set_thread_realtime(m_threads[k], 95);
130  ossia::set_thread_pinned(ossia::thread_type::AudioTask, k);
131 
132  while(m_running)
133  {
134  task* t{};
135  if(m_tasks.wait_dequeue_timed(t, 100))
136  {
137  execute(*t);
138  }
139  }
140  }};
141  }
142 
143  m_startFlag.test_and_set();
144  }
145 
146  ~executor()
147  {
148  m_running = false;
149  for(auto& t : m_threads)
150  {
151  t.join();
152  }
153  }
154 
155  void set_task_executor(task_function f) { m_func = std::move(f); }
156 
157  void run(taskflow& tf)
158  {
159  m_tf = &tf;
160  if(tf.m_tasks.empty())
161  {
162  return;
163  }
164 
165  m_toDoTasks = tf.m_tasks.size();
166  m_doneTasks.store(0, std::memory_order_relaxed);
167 
168  for(auto& task : tf.m_tasks)
169  {
170  task.m_remaining_dependencies.store(
171  task.m_dependencies, std::memory_order_relaxed);
172  task.m_executed.store(false, std::memory_order_relaxed);
173 #if defined(CHECK_EXEC_COUNTS)
174  m_checkVec[task.m_taskId] = 0;
175 #endif
176  }
177 
178  std::atomic_thread_fence(std::memory_order_seq_cst);
179 #if defined(DISABLE_DONE_TASKS)
180  thread_local ossia::small_pod_vector<ossia::task*, 8> toCleanup;
181 #endif
182  for(auto& task : tf.m_tasks)
183  {
184  if(task.m_dependencies == 0)
185  {
186 #if defined(DISABLE_DONE_TASKS)
187  if(task.m_node->enabled())
188 #endif
189  {
190 #if defined(CHECK_EXEC_COUNTS)
191  m_checkVec[task.m_taskId]++;
192  if(m_checkVec[task.m_taskId] != 1)
193  {
194  fmt::print(
195  stderr, "!!! task {} enqueued {}\n", task.m_taskId,
196  m_checkVec[task.m_taskId]);
197  std::abort();
198  }
199  assert(task.m_dependencies == 0);
200 #endif
201  std::atomic_thread_fence(std::memory_order_release);
202  m_tasks.enqueue(&task);
203  }
204 #if defined(DISABLE_DONE_TASKS)
205  else
206  {
207  toCleanup.push_back(&task);
208  }
209 #endif
210  }
211  }
212 
213 #if defined(DISABLE_DONE_TASKS)
214  for(auto& task : toCleanup)
215  {
216  process_done(*task);
217  }
218  toCleanup.clear();
219 #endif
220 
221  while(m_doneTasks.load(std::memory_order_relaxed) != m_toDoTasks)
222  {
223  task* t{};
224  if(m_tasks.wait_dequeue_timed(t, 1))
225  {
226  execute(*t);
227  }
228  }
229 
230  std::atomic_thread_fence(std::memory_order_seq_cst);
231  }
232 
233 private:
234  void process_done(ossia::task& task)
235  {
236  if(task.m_executed.exchange(true))
237  return;
238  assert(this->m_doneTasks != m_tf->m_tasks.size());
239 
240 #if defined(DISABLE_DONE_TASKS)
241  ossia::small_pod_vector<ossia::task*, 8> toCleanup;
242 #endif
243  for(int taskId : task.m_precedes)
244  {
245  auto& nextTask = m_tf->m_tasks[taskId];
246  assert(!nextTask.m_executed);
247 
248  std::atomic_int& remaining = nextTask.m_remaining_dependencies;
249  assert(remaining > 0);
250  const int rem = remaining.fetch_sub(1, std::memory_order_relaxed) - 1;
251  assert(rem >= 0);
252  if(rem == 0)
253  {
254 #if defined(DISABLE_DONE_TASKS)
255  if(nextTask.m_node->enabled())
256 #endif
257  {
258 #if defined(CHECK_EXEC_COUNTS)
259  m_checkVec[nextTask.m_taskId]++;
260  if(m_checkVec[nextTask.m_taskId] != 1)
261  {
262  fmt::print(
263  stderr, "!!! task {} enqueued {}\n", nextTask.m_taskId,
264  m_checkVec[nextTask.m_taskId]);
265  std::abort();
266  }
267 #endif
268  std::atomic_thread_fence(std::memory_order_release);
269  m_tasks.enqueue(&nextTask);
270  }
271 #if defined(DISABLE_DONE_TASKS)
272  else
273  {
274  toCleanup.push_back(&nextTask);
275  }
276 #endif
277  }
278  }
279 
280 #if defined(DISABLE_DONE_TASKS)
281  for(auto& clean : toCleanup)
282  {
283  process_done(*clean);
284  }
285 #endif
286 
287  this->m_doneTasks.fetch_add(1, std::memory_order_relaxed);
288  }
289 
290  void execute(task& task)
291  {
292  std::atomic_thread_fence(std::memory_order_acquire);
293  try
294  {
295  assert(!task.m_executed);
296 #if defined(CHECK_FOLLOWS)
297  for(auto& prev : task.m_follows)
298  {
299  auto& t = m_tf->m_tasks[prev];
300  assert(t.m_executed);
301  }
302 #endif
303 
304 #if defined(CHECK_EXEC_COUNTS)
305  assert(m_checkVec[task.m_taskId] == 1);
306 #endif
307  m_func(*task.m_node);
308 
309 #if defined(CHECK_EXEC_COUNTS)
310  assert(m_checkVec[task.m_taskId] == 1);
311 #endif
312  }
313  catch(...)
314  {
315  fmt::print(stderr, "error !\n");
316  }
317  std::atomic_thread_fence(std::memory_order_release);
318 
319  process_done(task);
320 #if defined(CHECK_EXEC_COUNTS)
321  assert(m_checkVec[task.m_taskId] == 1);
322 #endif
323 
324  std::atomic_thread_fence(std::memory_order_release);
325  }
326 
327  task_function m_func;
328 
329  std::atomic_bool m_running{};
330 
331  ossia::small_vector<std::thread, 8> m_threads;
332  std::atomic_flag m_startFlag = ATOMIC_FLAG_INIT;
333 
334  taskflow* m_tf{};
335  std::atomic_size_t m_doneTasks = 0;
336  std::size_t m_toDoTasks = 0;
337 
338  moodycamel::BlockingConcurrentQueue<task*> m_tasks;
339 
340 #if defined(CHECK_EXEC_COUNTS)
341  std::array<std::atomic_int, 5000> m_checkVec;
342 #endif
343 };
344 }
345 
346 #include <ossia/dataflow/graph/graph_static.hpp>
347 #include <ossia/detail/hash_map.hpp>
348 namespace ossia
349 {
350 struct custom_parallel_exec;
351 template <typename Impl>
352 struct custom_parallel_update
353 {
354 public:
355  std::shared_ptr<ossia::logger_type> logger;
356  std::shared_ptr<bench_map> perf_map;
357 
358  template <typename Graph_T>
359  custom_parallel_update(Graph_T& g, const ossia::graph_setup_options& opt)
360  : impl{g, opt}
361  , executor{opt.parallel_threads}
362  {
363  }
364 
365  void update_graph(
366  ossia::node_map& nodes, const std::vector<graph_node*>& topo_order,
367  ossia::graph_t& graph)
368  {
369  flow_nodes.clear();
370  flow_graph.clear();
371 
372  flow_graph.reserve(nodes.size());
373 
374  if(logger)
375  {
376  if(perf_map)
377  {
378  executor.set_task_executor(
379  node_exec_logger_bench{cur_state, *perf_map, *logger});
380  for(auto node : topo_order)
381  {
382  (*perf_map)[node] = std::nullopt;
383  flow_nodes[node] = flow_graph.emplace(*node);
384  }
385  }
386  else
387  {
388  executor.set_task_executor(node_exec_logger{cur_state, *logger});
389  for(auto node : topo_order)
390  {
391  flow_nodes[node] = flow_graph.emplace(*node);
392  }
393  }
394  }
395  else
396  {
397  executor.set_task_executor(node_exec{cur_state});
398  for(auto node : topo_order)
399  {
400  flow_nodes[node] = flow_graph.emplace(*node);
401  }
402  }
403 
404  for(auto [ei, ei_end] = boost::edges(graph); ei != ei_end; ++ei)
405  {
406  auto edge = *ei;
407  auto& n1 = graph[edge.m_source];
408  auto& n2 = graph[edge.m_target];
409 
410  auto& sender = flow_nodes[n2.get()];
411  auto& receiver = flow_nodes[n1.get()];
412  sender->precede(*receiver);
413  }
414  }
415 
416  template <typename Graph_T, typename DevicesT>
417  void operator()(Graph_T& g, const DevicesT& devices)
418  {
419  impl(g, devices);
420  update_graph(g.m_nodes, g.m_all_nodes, impl.m_sub_graph);
421  }
422 
423 private:
424  friend struct custom_parallel_exec;
425 
426  Impl impl;
427  execution_state* cur_state{};
428 
429  ossia::taskflow flow_graph;
430  ossia::executor executor;
431  ossia::hash_map<graph_node*, ossia::task*> flow_nodes;
432 };
433 
434 struct custom_parallel_exec
435 {
436  template <typename Graph_T>
437  custom_parallel_exec(Graph_T&)
438  {
439  }
440 
441  template <typename Graph_T, typename Impl>
442  void operator()(
443  Graph_T& g, custom_parallel_update<Impl>& self, ossia::execution_state& e,
444  const std::vector<ossia::graph_node*>&)
445  {
446  self.cur_state = &e;
447  self.executor.run(self.flow_graph);
448  }
449 };
450 
451 using custom_parallel_tc_graph
452  = graph_static<custom_parallel_update<tc_update<fast_tc>>, custom_parallel_exec>;
453 }
454 
455 //#undef memory_order_relaxed
456 //#undef memory_order_acquire
457 //#undef memory_order_release
Definition: git_info.h:7
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition: context.cpp:104