OSSIA
Open Scenario System for Interactive Application
message_queue.hpp
1 #pragma once
2 #include <ossia/detail/lockfree_queue.hpp>
3 #include <ossia/detail/ptr_set.hpp>
4 #include <ossia/network/base/device.hpp>
5 #include <ossia/network/base/parameter.hpp>
6 
7 #include <ankerl/unordered_dense.h>
8 
9 #include <smallfun.hpp>
10 
11 namespace ossia
12 {
13 struct received_value
14 {
15  ossia::net::parameter_base* address{};
16  ossia::value value;
17 };
18 
19 class message_queue final : public Nano::Observer
20 {
21 public:
23  message_queue(ossia::net::device_base& dev)
24  : device{dev}
25  {
26  dev.on_parameter_removing.connect<&message_queue::on_param_removed>(*this);
27  }
28 
29  ~message_queue()
30  {
31 #if defined(__cpp_exceptions)
32  try
33  {
34  for(auto reg : m_reg)
35  {
36  reg.first->remove_callback(reg.second.second);
37  }
38  }
39  catch(...)
40  {
41  }
42 #else
43  for(auto reg : m_reg)
44  {
45  reg.first->remove_callback(reg.second.second);
46  }
47 #endif
48  }
49 
50  bool try_dequeue(ossia::received_value& v) { return m_queue.try_dequeue(v); }
51 
52  void reg(ossia::net::parameter_base& p)
53  {
54  auto ptr = &p;
55  auto reg_it = m_reg.find(&p);
56  if(reg_it == m_reg.end())
57  {
58  auto it = p.add_callback([this, ptr](const ossia::value& val) {
59  m_queue.enqueue({ptr, val});
60  });
61  m_reg.insert({&p, {0, it}});
62  }
63  else
64  {
65  reg_it->second.first++;
66  }
67  }
68 
69  void unreg(ossia::net::parameter_base& p)
70  {
71  auto it = m_reg.find(&p);
72  if(it != m_reg.end())
73  {
74  it->second.first--;
75  if(it->second.first <= 0)
76  {
77  p.remove_callback(it->second.second);
78  m_reg.erase(it);
79  }
80  }
81  }
82 
83 private:
84  void on_param_removed(const ossia::net::parameter_base& p)
85  {
86  auto it = m_reg.find(const_cast<ossia::net::parameter_base*>(&p));
87  if(it != m_reg.end())
88  m_reg.erase(it);
89  }
90 
91  ossia::mpmc_queue<received_value> m_queue;
92 
93  ossia::ptr_map<
95  std::pair<int, ossia::net::parameter_base::callback_index>>
96  m_reg;
97 };
98 
99 class global_message_queue final : public Nano::Observer
100 {
101 public:
102  global_message_queue(ossia::net::device_base& dev)
103  {
104  dev.on_message.connect<&global_message_queue::on_message>(*this);
105  }
106 
107  void on_message(const ossia::net::parameter_base& p)
108  {
109  m_queue.enqueue({const_cast<ossia::net::parameter_base*>(&p), p.value()});
110  }
111 
112  bool try_dequeue(ossia::received_value& v) { return m_queue.try_dequeue(v); }
113 
114 private:
115  ossia::mpmc_queue<received_value> m_queue;
116 };
117 
118 struct coalescing_queue
119 {
120 public:
121  smallfun::function<void(ossia::net::parameter_base&, ossia::value)> callback;
122 
123  ossia::mpmc_queue<ossia::received_value> noncritical;
124  ossia::mpmc_queue<ossia::received_value> critical;
125 
126  ossia::hash_map<ossia::net::parameter_base*, ossia::value> coalesce;
127 
128  void process_messages()
129  {
130  ossia::received_value v;
131 
132  while(critical.try_dequeue(v))
133  {
134  callback(*v.address, v.value);
135  }
136 
137  coalesce.clear();
138  while(noncritical.try_dequeue(v))
139  {
140  coalesce[v.address] = v.value;
141  }
142 
143  for(auto& [p, v] : coalesce)
144  {
145  callback(*p, v);
146  }
147 
148  coalesce.clear();
149  }
150 };
151 }
void remove_callback(iterator it)
remove_callback Removes a callback identified by an iterator.
Definition: callback_container.hpp:114
iterator add_callback(T &&callback)
add_callback Add a new callback.
Definition: callback_container.hpp:90
Root of a device tree.
Definition: ossia/network/base/device.hpp:58
The parameter_base class.
Definition: ossia/network/base/parameter.hpp:48
virtual ossia::value value() const =0
Clone the current value without any network request.
The value class.
Definition: value.hpp:173
Definition: git_info.h:7
bool critical
Means that the node is very important, e.g. a "play" message.
Definition: node_attributes.hpp:92