OSSIA
Open Scenario System for Interactive Application
receiver.hpp
1 #pragma once
3 #include <ossia/detail/thread.hpp>
4 
5 #include <oscpack/ip/UdpSocket.h>
6 #include <oscpack/osc/OscDebug.h>
7 #include <oscpack/osc/OscPacketListener.h>
8 
9 #include <functional>
10 #include <memory>
11 #include <sstream>
12 #include <thread>
13 
14 namespace oscpack
15 {
16 
17 namespace detail
18 {
19 
20 template <typename Impl_T>
21 struct ClearListener : public oscpack::TimerListener
22 {
23  ClearListener(UdpSocket<Impl_T>& s)
24  : socket{s}
25  {
26  }
27  UdpSocket<Impl_T>& socket;
28 
29  void TimerExpired() override { socket.AsynchronousBreak(); }
30 };
31 
32 template <typename Impl_T>
33 class ReceiveSocket : public UdpSocket<Impl_T>
34 {
35  SocketReceiveMultiplexer<Impl_T> mux_;
36  PacketListener* listener_;
37 
38 public:
39  ReceiveSocket(const IpEndpointName& localEndpoint, PacketListener* listener)
40  : listener_(listener)
41  {
42  this->Bind(localEndpoint);
43  mux_.AttachSocketListener(&this->impl_, listener_);
44  }
45 
46  ~ReceiveSocket() { mux_.DetachSocketListener(&this->impl_, listener_); }
47 
48  // see SocketReceiveMultiplexer above for the behaviour of these methods...
49  void Run() { mux_.Run(); }
50  void Break()
51  {
52  ClearListener<Impl_T> l{*this};
53  mux_.AttachPeriodicTimerListener(0, &l);
54  mux_.Break();
55  }
56  void AsynchronousBreak()
57  {
58  ClearListener<Impl_T> l{*this};
59  mux_.AttachPeriodicTimerListener(0, &l);
60  mux_.AsynchronousBreak();
61  }
62 };
63 }
64 using ReceiveSocket = detail::UdpListeningReceiveSocket<detail::Implementation>;
65 }
66 namespace osc
67 {
68 
69 template <typename MessageHandler>
75 class listener final : public oscpack::OscPacketListener
76 {
77 public:
78  listener(MessageHandler msg)
79  : m_messageHandler{msg}
80  {
81  }
82 
83  void ProcessMessage(
84  const oscpack::ReceivedMessage& m, const oscpack::IpEndpointName& ip) override
85  {
86  try
87  {
88  m_messageHandler(m, ip);
89  }
90  catch(std::exception& e)
91  {
92  std::stringstream s;
93  oscpack::debug(s, m);
94 
95  ossia::logger().error(
96  "osc::listener::ProcessMessage error: '{}': {}", s.str(), e.what());
97  }
98  catch(...)
99  {
100  std::stringstream s;
101  oscpack::debug(s, m);
102  ossia::logger().error("osc::listener::ProcessMessage error: '{}'", s.str());
103  }
104  }
105 
106  void ProcessPacket(
107  const char* data, int size, const oscpack::IpEndpointName& remoteEndpoint) override
108  {
109  try
110  {
111  oscpack::ReceivedPacket p(data, size);
112  if(p.IsBundle())
113  this->ProcessBundle(oscpack::ReceivedBundle(p), remoteEndpoint);
114  else
115  this->ProcessMessage(oscpack::ReceivedMessage(p), remoteEndpoint);
116  }
117  catch(std::exception& e)
118  {
119  ossia::logger().error("osc::listener::ProcessPacket error: {}", e.what());
120  }
121  catch(...)
122  {
123  ossia::logger().error("osc::listener::ProcessPacket error");
124  }
125  }
126 
127 private:
128  MessageHandler m_messageHandler;
129 };
130 
137 class receiver
138 {
139 public:
140  template <typename Handler>
141  receiver(unsigned int port, Handler msg)
142  : m_impl{std::make_unique<listener<Handler>>(msg)}
143  {
144  setPort(port);
145  }
146 
147  receiver() = default;
148  receiver(receiver&& other) noexcept
149  {
150  other.stop();
151  m_impl = std::move(other.m_impl);
152  m_socket = std::move(other.m_socket);
153  setPort(other.m_port);
154  }
155 
156  receiver& operator=(receiver&& other) noexcept
157  {
158  stop();
159 
160  m_impl = std::move(other.m_impl);
161  m_socket = std::move(other.m_socket);
162 
163  setPort(other.m_port);
164 
165  return *this;
166  }
167 
168  ~receiver() { stop(); }
169 
170  void run()
171  {
172  if(m_runThread.joinable())
173  stop();
174 
175  m_runThread = std::thread([this] {
176  ossia::set_thread_name("ossia osc");
177  run_impl();
178  });
179  while(!m_running)
180  std::this_thread::sleep_for(std::chrono::microseconds(1));
181  }
182 
183  void run_impl()
184  {
185  m_running = true;
186  osc_thread_run:
187  try
188  {
189  m_socket->Run();
190  }
191  catch(...)
192  {
193  goto osc_thread_run;
194  }
195  }
196 
197  void stop()
198  {
199  m_running = false;
200  if(m_socket)
201  {
202  if(m_runThread.joinable())
203  {
204  try
205  {
206  oscpack::UdpTransmitSocket send_socket(
207  oscpack::IpEndpointName("127.0.0.1", port()));
208  send_socket.Send("__stop_", 8);
209  m_socket->AsynchronousBreak();
210  std::this_thread::sleep_for(std::chrono::milliseconds(50));
211 
212  m_runThread.join();
213  }
214  catch(std::exception& e)
215  {
216  if(m_runThread.joinable())
217  m_runThread.detach();
218  }
219  }
220 
221  m_socket.reset();
222  }
223  else
224  {
225  if(m_runThread.joinable())
226  {
227  // Error somewhere: the thread is joinable, but there's no socket...
228  m_runThread.detach();
229  }
230  }
231  }
232 
233  unsigned int port() const { return m_port; }
234 
235  unsigned int setPort(unsigned int port)
236  {
237  m_port = port;
238 
239  bool ok = false;
240  while(!ok)
241  {
242  try
243  {
244  m_socket = std::make_unique<oscpack::ReceiveSocket>(
245  oscpack::IpEndpointName(oscpack::IpEndpointName::ANY_ADDRESS, m_port),
246  m_impl.get());
247  ok = true;
248  }
249  catch(std::runtime_error&)
250  {
251  m_port++;
252  }
253  }
254 
255  return m_port;
256  }
257 
258 private:
259  unsigned int m_port = 0;
260  std::unique_ptr<oscpack::OscPacketListener> m_impl;
261  std::unique_ptr<oscpack::ReceiveSocket> m_socket;
262 
263  std::thread m_runThread;
264  std::atomic_bool m_running = false;
265 };
266 }
The listener class.
Definition: receiver.hpp:76
The receiver class.
Definition: receiver.hpp:138
spdlog::logger & logger() noexcept
Where the errors will be logged. Default is stderr.
Definition: context.cpp:104