OSSIA
Open Scenario System for Interactive Application
graph_parallel.hpp
1 #pragma once
2 #include <ossia-config.hpp>
3 #if defined(OSSIA_PARALLEL)
4 #include <ossia/dataflow/graph/graph_static.hpp>
5 #include <ossia/detail/hash_map.hpp>
6 /*
7 #include <tbb/flow_graph.h>
8 
9 namespace ossia
10 {
11 
12 struct parallel_exec;
13 template <typename Impl>
14 struct parallel_update
15 {
16 public:
17  using cont_node = tbb::flow::continue_node<tbb::flow::continue_msg>;
18  std::shared_ptr<spdlog::logger> logger;
19  std::shared_ptr<bench_map> perf_map;
20 
21  template <typename Graph_T>
22  parallel_update(Graph_T& g) : impl{g}
23  {
24  }
25 
26  void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
27  {
28  namespace tbf = tbb::flow;
29  flow_nodes.clear();
30  start_nodes.clear();
31  nodes_with_incoming_edges.clear();
32 
33  flow_graph.~graph();
34  new (&flow_graph) tbf::graph;
35 
36  if (logger)
37  {
38  if (perf_map)
39  {
40  for (const auto& n : nodes)
41  {
42  graph_node* node = n.first.get();
43  (*perf_map)[node] = std::nullopt;
44  flow_nodes.insert({node, std::make_unique<cont_node>(
45  flow_graph, node_exec_logger_bench{
46  cur_state, *perf_map,
47  *logger, *node})});
48  }
49  }
50  else
51  {
52  for (auto n : nodes)
53  {
54  graph_node* node = n.first.get();
55  flow_nodes.insert(
56  {node,
57  std::make_unique<cont_node>(
58  flow_graph, node_exec_logger{cur_state, *logger, *node})});
59  }
60  }
61  }
62  else
63  {
64  for (auto n : nodes)
65  {
66  graph_node* node = n.first.get();
67  flow_nodes.insert(
68  {node, std::make_unique<cont_node>(
69  flow_graph, node_exec{cur_state, *node})});
70  }
71  }
72 
73  for (auto n : nodes)
74  {
75  graph_node* n1 = n.first.get();
76  for (auto m : nodes)
77  {
78  graph_node* n2 = m.first.get();
79  if (n2 != n1)
80  {
81  if (boost::edge(n.second, m.second, graph).second)
82  {
83  tbf::make_edge(*flow_nodes[n2], *flow_nodes[n1]);
84  nodes_with_incoming_edges.push_back(n1);
85  }
86  }
87  }
88  }
89 
90  for (auto n : nodes)
91  {
92  if (!ossia::contains(nodes_with_incoming_edges, n.first.get()))
93  start_nodes.push_back(n.first.get());
94  }
95 
96  start_node
97  = std::make_unique<tbf::broadcast_node<tbf::continue_msg>>(flow_graph);
98  for (auto node : start_nodes)
99  {
100  tbf::make_edge(*start_node, *flow_nodes[node]);
101  }
102  }
103 
104  template <typename Graph_T, typename DevicesT>
105  void operator()(Graph_T& g, const DevicesT& devices)
106  {
107  impl(g, devices);
108  update_graph(g.m_nodes, impl.m_sub_graph);
109  }
110 
111 private:
112  friend struct parallel_exec;
113 
114  Impl impl;
115  execution_state* cur_state{};
116  std::unique_ptr<tbb::flow::broadcast_node<tbb::flow::continue_msg>>
117 start_node; std::vector<graph_node*> nodes_with_incoming_edges;
118 
119  tbb::flow::graph flow_graph;
120  ossia::hash_map<graph_node*, std::unique_ptr<cont_node>> flow_nodes;
121  std::vector<graph_node*> start_nodes;
122 };
123 
124 struct parallel_exec
125 {
126  template <typename Graph_T>
127  parallel_exec(Graph_T&)
128  {
129  }
130 
131  template <typename Graph_T, typename Impl>
132  void operator()(
133  Graph_T& g, parallel_update<Impl>& self, ossia::execution_state& e,
134  const std::vector<ossia::graph_node*>&)
135  {
136  self.cur_state = &e;
137  self.start_node->try_put(tbb::flow::continue_msg{});
138  self.flow_graph.wait_for_all();
139  }
140 };
141 
142 using parallel_tc_graph
143  = graph_static<parallel_update<tc_update<fast_tc>>, parallel_exec>;
144 }
145 */
146 
147 /*
148 #if __has_include(<taskflow/taskflow.hpp>)
149 #include <taskflow/taskflow.hpp>
150 namespace ossia
151 {
152 struct cpptf_exec;
153 template <typename Impl>
154 struct cpptf_update
155 {
156 public:
157  std::shared_ptr<spdlog::logger> logger;
158  std::shared_ptr<bench_map> perf_map;
159 
160  template <typename Graph_T>
161  cpptf_update(Graph_T& g) : impl{g}
162  {
163  }
164 
165  void update_graph(ossia::node_map& nodes, ossia::graph_t& graph)
166  {
167  flow_nodes.clear();
168  flow_graph.clear();
169 
170  if (logger)
171  {
172  if (perf_map)
173  {
174  for (const auto& n : nodes)
175  {
176  graph_node* node = n.first.get();
177  (*perf_map)[node] = std::nullopt;
178  flow_nodes[node] =
179 flow_graph.emplace(node_exec_logger_bench{cur_state, *perf_map, *logger,
180 *node});
181  }
182  }
183  else
184  {
185  for (auto n : nodes)
186  {
187  graph_node* node = n.first.get();
188  flow_nodes[node] = flow_graph.emplace(node_exec_logger{cur_state,
189 *logger, *node});
190  }
191  }
192  }
193  else
194  {
195  for (auto n : nodes)
196  {
197  graph_node* node = n.first.get();
198  flow_nodes[node] = flow_graph.emplace(node_exec{cur_state, *node});
199  }
200  }
201 
202  // TODO instead for all edge
203  for (auto n : nodes)
204  {
205  graph_node* n1 = n.first.get();
206  for (auto m : nodes)
207  {
208  graph_node* n2 = m.first.get();
209  if (n2 != n1)
210  {
211  if (boost::edge(n.second, m.second, graph).second)
212  {
213  auto& sender = flow_nodes[n2];
214  auto& receiver = flow_nodes[n1];
215  sender.precede(receiver);
216  }
217  }
218  }
219  }
220  }
221 
222  template <typename Graph_T, typename DevicesT>
223  void operator()(Graph_T& g, const DevicesT& devices)
224  {
225  impl(g, devices);
226  update_graph(g.m_nodes, impl.m_sub_graph);
227  }
228 
229 private:
230  friend struct cpptf_exec;
231 
232  Impl impl;
233  execution_state* cur_state{};
234 
235  tf::Taskflow flow_graph;
236  tf::Executor executor;
237  ossia::hash_map<graph_node*, tf::Task> flow_nodes;
238 };
239 
240 struct cpptf_exec
241 {
242  template <typename Graph_T>
243  cpptf_exec(Graph_T&)
244  {
245  }
246 
247  template <typename Graph_T, typename Impl>
248  void operator()(
249  Graph_T& g, cpptf_update<Impl>& self, ossia::execution_state& e,
250  const std::vector<ossia::graph_node*>&)
251  {
252  self.cur_state = &e;
253  self.executor.run(self.flow_graph).get();
254  }
255 };
256 
257 using cpptf_tc_graph
258  = graph_static<cpptf_update<tc_update<fast_tc>>, cpptf_exec>;
259 }
260 #endif
261 */
262 
263 #include <ossia/dataflow/graph/graph_parallel_impl.hpp>
264 
265 #endif