OSSIA
Open Scenario System for Interactive Application
pulseaudio_protocol.hpp
1 #pragma once
2 #include <ossia/detail/config.hpp>
3 
4 #if defined(OSSIA_ENABLE_PULSEAUDIO)
5 
6 #if __has_include(<pulse/pulseaudio.h>)
7 #include <ossia/audio/audio_engine.hpp>
8 #include <ossia/detail/dylib_loader.hpp>
9 #include <ossia/detail/thread.hpp>
10 
11 #include <pulse/pulseaudio.h>
12 
13 #define OSSIA_AUDIO_PULSEAUDIO 1
14 namespace ossia
15 {
16 
17 class libpulse
18 {
19 public:
20  decltype(&::pa_threaded_mainloop_new) pa_threaded_mainloop_new{};
21  decltype(&::pa_threaded_mainloop_free) pa_threaded_mainloop_free{};
22  decltype(&::pa_threaded_mainloop_get_api) pa_threaded_mainloop_get_api{};
23  decltype(&::pa_context_new) pa_context_new{};
24  decltype(&::pa_context_ref) pa_context_ref{};
25  decltype(&::pa_context_unref) pa_context_unref{};
26  decltype(&::pa_context_set_state_callback) pa_context_set_state_callback{};
27  decltype(&::pa_threaded_mainloop_lock) pa_threaded_mainloop_lock{};
28  decltype(&::pa_threaded_mainloop_unlock) pa_threaded_mainloop_unlock{};
29  decltype(&::pa_threaded_mainloop_start) pa_threaded_mainloop_start{};
30  decltype(&::pa_threaded_mainloop_stop) pa_threaded_mainloop_stop{};
31  decltype(&::pa_context_connect) pa_context_connect{};
32  decltype(&::pa_context_get_state) pa_context_get_state{};
33  decltype(&::pa_threaded_mainloop_wait) pa_threaded_mainloop_wait{};
34  decltype(&::pa_channel_map_init_stereo) pa_channel_map_init_stereo{};
35  decltype(&::pa_stream_new) pa_stream_new{};
36  decltype(&::pa_stream_new_with_proplist) pa_stream_new_with_proplist{};
37  decltype(&::pa_stream_ref) pa_stream_ref{};
38  decltype(&::pa_stream_unref) pa_stream_unref{};
39  decltype(&::pa_stream_get_state) pa_stream_get_state{};
40  decltype(&::pa_stream_get_time) pa_stream_get_time{};
41  decltype(&::pa_stream_set_state_callback) pa_stream_set_state_callback{};
42  decltype(&::pa_stream_set_write_callback) pa_stream_set_write_callback{};
43  decltype(&::pa_stream_set_read_callback) pa_stream_set_read_callback{};
44  decltype(&::pa_stream_set_overflow_callback) pa_stream_set_overflow_callback{};
45  decltype(&::pa_stream_connect_playback) pa_stream_connect_playback{};
46  decltype(&::pa_stream_cork) pa_stream_cork{};
47  decltype(&::pa_stream_is_corked) pa_stream_is_corked{};
48  decltype(&::pa_threaded_mainloop_signal) pa_threaded_mainloop_signal{};
49  decltype(&::pa_stream_begin_write) pa_stream_begin_write{};
50  decltype(&::pa_stream_write) pa_stream_write{};
51  decltype(&::pa_stream_set_name) pa_stream_set_name{};
52 
53  static const libpulse& instance()
54  {
55  static const libpulse self;
56  return self;
57  }
58 
59 private:
60  dylib_loader library;
61 
62  libpulse()
63  : library("libpulse.so.0")
64  {
65  // in terms of regex:
66  // decltype\‍(&::([a-z_]+)\‍) [a-z_]+{};
67  // \1 = library.symbol<decltype(&::\1)>("\1");
68 
69  pa_threaded_mainloop_new = library.symbol<decltype(&::pa_threaded_mainloop_new)>(
70  "pa_threaded_mainloop_new");
71  pa_threaded_mainloop_free = library.symbol<decltype(&::pa_threaded_mainloop_free)>(
72  "pa_threaded_mainloop_free");
73  pa_threaded_mainloop_get_api
74  = library.symbol<decltype(&::pa_threaded_mainloop_get_api)>(
75  "pa_threaded_mainloop_get_api");
76  pa_threaded_mainloop_lock = library.symbol<decltype(&::pa_threaded_mainloop_lock)>(
77  "pa_threaded_mainloop_lock");
78  pa_threaded_mainloop_unlock
79  = library.symbol<decltype(&::pa_threaded_mainloop_unlock)>(
80  "pa_threaded_mainloop_unlock");
81  pa_threaded_mainloop_start = library.symbol<decltype(&::pa_threaded_mainloop_start)>(
82  "pa_threaded_mainloop_start");
83  pa_threaded_mainloop_stop = library.symbol<decltype(&::pa_threaded_mainloop_stop)>(
84  "pa_threaded_mainloop_stop");
85  pa_threaded_mainloop_signal
86  = library.symbol<decltype(&::pa_threaded_mainloop_signal)>(
87  "pa_threaded_mainloop_signal");
88  pa_context_new = library.symbol<decltype(&::pa_context_new)>("pa_context_new");
89  pa_context_ref = library.symbol<decltype(&::pa_context_ref)>("pa_context_ref");
90  pa_context_unref = library.symbol<decltype(&::pa_context_unref)>("pa_context_unref");
91  pa_context_set_state_callback
92  = library.symbol<decltype(&::pa_context_set_state_callback)>(
93  "pa_context_set_state_callback");
94  pa_context_connect
95  = library.symbol<decltype(&::pa_context_connect)>("pa_context_connect");
96  pa_context_get_state
97  = library.symbol<decltype(&::pa_context_get_state)>("pa_context_get_state");
98  pa_threaded_mainloop_wait = library.symbol<decltype(&::pa_threaded_mainloop_wait)>(
99  "pa_threaded_mainloop_wait");
100  pa_channel_map_init_stereo = library.symbol<decltype(&::pa_channel_map_init_stereo)>(
101  "pa_channel_map_init_stereo");
102  pa_stream_new = library.symbol<decltype(&::pa_stream_new)>("pa_stream_new");
103  pa_stream_new_with_proplist
104  = library.symbol<decltype(&::pa_stream_new_with_proplist)>(
105  "pa_stream_new_with_proplist");
106  pa_stream_ref = library.symbol<decltype(&::pa_stream_ref)>("pa_stream_ref");
107  pa_stream_unref = library.symbol<decltype(&::pa_stream_unref)>("pa_stream_unref");
108  pa_stream_get_state
109  = library.symbol<decltype(&::pa_stream_get_state)>("pa_stream_get_state");
110  pa_stream_get_time
111  = library.symbol<decltype(&::pa_stream_get_time)>("pa_stream_get_time");
112  pa_stream_set_state_callback
113  = library.symbol<decltype(&::pa_stream_set_state_callback)>(
114  "pa_stream_set_state_callback");
115  pa_stream_set_write_callback
116  = library.symbol<decltype(&::pa_stream_set_write_callback)>(
117  "pa_stream_set_write_callback");
118  pa_stream_set_read_callback
119  = library.symbol<decltype(&::pa_stream_set_read_callback)>(
120  "pa_stream_set_read_callback");
121  pa_stream_set_overflow_callback
122  = library.symbol<decltype(&::pa_stream_set_overflow_callback)>(
123  "pa_stream_set_overflow_callback");
124  pa_stream_connect_playback = library.symbol<decltype(&::pa_stream_connect_playback)>(
125  "pa_stream_connect_playback");
126  pa_stream_cork = library.symbol<decltype(&::pa_stream_cork)>("pa_stream_cork");
127  pa_stream_is_corked
128  = library.symbol<decltype(&::pa_stream_is_corked)>("pa_stream_is_corked");
129  pa_stream_begin_write
130  = library.symbol<decltype(&::pa_stream_begin_write)>("pa_stream_begin_write");
131  pa_stream_write = library.symbol<decltype(&::pa_stream_write)>("pa_stream_write");
132  pa_stream_set_name
133  = library.symbol<decltype(&::pa_stream_set_name)>("pa_stream_set_name");
134 
135  // in terms of regex:
136  // decltype\‍(&::([a-z_]+)\‍) [a-z_]+{};
137  // assert(\1);
138  assert(pa_threaded_mainloop_new);
139  assert(pa_threaded_mainloop_free);
140  assert(pa_threaded_mainloop_get_api);
141  assert(pa_threaded_mainloop_lock);
142  assert(pa_threaded_mainloop_unlock);
143  assert(pa_threaded_mainloop_start);
144  assert(pa_threaded_mainloop_stop);
145  assert(pa_threaded_mainloop_signal);
146  assert(pa_context_new);
147  assert(pa_context_ref);
148  assert(pa_context_unref);
149  assert(pa_context_set_state_callback);
150  assert(pa_context_connect);
151  assert(pa_context_get_state);
152  assert(pa_threaded_mainloop_wait);
153  assert(pa_channel_map_init_stereo);
154  assert(pa_stream_new);
155  assert(pa_stream_new_with_proplist);
156  assert(pa_stream_ref);
157  assert(pa_stream_unref);
158  assert(pa_stream_get_state);
159  assert(pa_stream_get_time);
160  assert(pa_stream_set_state_callback);
161  assert(pa_stream_set_write_callback);
162  assert(pa_stream_set_read_callback);
163  assert(pa_stream_set_overflow_callback);
164  assert(pa_stream_connect_playback);
165  assert(pa_stream_cork);
166  assert(pa_stream_is_corked);
167  assert(pa_stream_begin_write);
168  assert(pa_stream_write);
169  assert(pa_stream_set_name);
170  }
171 };
172 
173 class pulseaudio_engine final : public audio_engine
174 {
175 public:
176  struct mainloop_locker
177  {
178  pulseaudio_engine& engine;
179  mainloop_locker(pulseaudio_engine& engine)
180  : engine{engine}
181  {
182  if(engine.m_mainloop)
183  {
184  const auto& pa = libpulse::instance();
185  pa.pa_threaded_mainloop_lock(engine.m_mainloop);
186  }
187  }
188 
189  ~mainloop_locker()
190  {
191  if(engine.m_mainloop)
192  {
193  const auto& pa = libpulse::instance();
194  pa.pa_threaded_mainloop_unlock(engine.m_mainloop);
195  }
196  }
197  };
198 
199  using sample_rate_t = double;
200  using buffer_size_t = uint32_t;
201 
202  pulseaudio_engine(
203  std::string name, std::string card_in, std::string card_out, int inputs,
204  int outputs, int rate, int bs)
205  {
206  inputs = 0;
207  outputs = 2;
208  bs = 512;
209  rate = 48000;
210  effective_inputs = inputs;
211  effective_outputs = outputs;
212  m_frames = bs;
213 
214  const auto& pa = libpulse::instance();
215 
216  m_mainloop = pa.pa_threaded_mainloop_new();
217  if(!m_mainloop)
218  {
219  // let's switch to Either or Outcome shall we :-)
220  throw std::runtime_error("Cannot initialize pulseaudio main loop");
221  }
222 
223  m_api = pa.pa_threaded_mainloop_get_api(m_mainloop);
224  if(!m_api)
225  {
226  throw std::runtime_error("Cannot initialize pulseaudio mainloop API");
227  }
228 
229  // both for pulseaudio, and for JACK, instead of requesting a device,
230  // you declare your app as being a device which will be a piece of the
231  // audio graph - so there should be an API to set its name in that graph.
232  m_context = pa.pa_context_new(m_api, m_name.data());
233  if(!m_context)
234  {
235  throw std::runtime_error("Cannot initialize pulseaudio context");
236  }
237 
238  auto context_callback = [](pa_context*, void* mainloop) {
239  auto& pa = libpulse::instance();
240  pa.pa_threaded_mainloop_signal((pa_threaded_mainloop*)mainloop, 0);
241  };
242  pa.pa_context_set_state_callback(m_context, context_callback, m_mainloop);
243 
244  mainloop_locker lock{*this};
245 
246  if(auto err = pa.pa_threaded_mainloop_start(m_mainloop); err != 0)
247  {
248  throw std::runtime_error("Cannot start pulseaudio main loop");
249  }
250  if(auto err
251  = pa.pa_context_connect(m_context, nullptr, PA_CONTEXT_NOAUTOSPAWN, nullptr);
252  err != 0)
253  {
254  throw std::runtime_error("Cannot conntext the pulseaudio context");
255  }
256 
257  // Wait until everything is ready - completely arbitrary timeout, maybe
258  // this should be settable
259  using namespace std;
260  using namespace std::literals;
261  static const constexpr auto default_timeout = 3s;
262  {
263  auto t0 = chrono::steady_clock::now();
264  bool timeout = false;
265  bool ready = false;
266  while((timeout = (chrono::steady_clock::now() - t0 < default_timeout)) && !ready)
267  {
268  switch(pa.pa_context_get_state(m_context))
269  {
270  case PA_CONTEXT_CONNECTING:
271  case PA_CONTEXT_AUTHORIZING:
272  case PA_CONTEXT_SETTING_NAME:
273  pa.pa_threaded_mainloop_wait(m_mainloop);
274  continue;
275 
276  case PA_CONTEXT_READY:
277  ready = true;
278  break;
279 
280  case PA_CONTEXT_FAILED:
281  case PA_CONTEXT_TERMINATED:
282  default:
283  throw std::runtime_error("Invalid context state");
284  }
285  }
286 
287  if(!ready)
288  {
289  throw std::runtime_error("Context creation timeout");
290  }
291  }
292 
296 
297  pa_sample_spec sample_specifications;
298  sample_specifications.format = PA_SAMPLE_FLOAT32LE;
299 
300  // for pulseaudio, samplerate is an uint
301  sample_specifications.rate = rate;
302 
303  // in pulse and JACK we ask how many channels we want ; in addition to
304  // Jack, pulse does intelligent resampling / upmixing / downmixing
305  sample_specifications.channels = 2;
306 
307  pa_channel_map map;
308  pa.pa_channel_map_init_stereo(&map);
309 
310  m_stream = pa.pa_stream_new(m_context, "ossia", &sample_specifications, &map);
311  if(!m_stream)
312  {
313  throw std::runtime_error("Cannot initialize pulseaudio stream");
314  }
315 
316  const auto stream_callback = [](pa_stream* s, void* mainloop) {
317  auto& pa = libpulse::instance();
318  pa.pa_threaded_mainloop_signal((pa_threaded_mainloop*)mainloop, 0);
319  };
320 
321  pa.pa_stream_set_state_callback(m_stream, stream_callback, m_mainloop);
322  pa.pa_stream_set_write_callback(m_stream, output_callback, this);
323 
324  // Don't set values here, instead the server will provide what it judges to
325  // be the best.
326  pa_buffer_attr buffer_attr;
327  buffer_attr.maxlength = m_frames * sizeof(float) * inputs;
328  buffer_attr.tlength = m_frames * sizeof(float) * outputs;
329  buffer_attr.prebuf = (uint32_t)-1;
330  buffer_attr.minreq = 0;
331 
332  const auto stream_flags = static_cast<pa_stream_flags_t>(
333  PA_STREAM_START_CORKED | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_NOT_MONOTONIC
334  | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY);
335 
336  // Connect stream to the default audio output sink
337  if(auto err = pa.pa_stream_connect_playback(
338  m_stream, nullptr, &buffer_attr, stream_flags, nullptr, nullptr);
339  err != 0)
340  {
341  throw std::runtime_error("Cannot start pulseaudio stream");
342  }
343 
344  {
345  auto t0 = chrono::steady_clock::now();
346  bool timeout = false;
347  bool ready = false;
348  while((timeout = (chrono::steady_clock::now() - t0 < default_timeout)) && !ready)
349  {
350  switch(pa.pa_stream_get_state(m_stream))
351  {
352  case PA_STREAM_CREATING:
353  pa.pa_threaded_mainloop_wait(m_mainloop);
354  break;
355  case PA_STREAM_READY:
356  ready = true;
357  break;
358  default:
359  throw std::runtime_error("Invalid stream state");
360  }
361  }
362 
363  if(!ready)
364  {
365  throw std::runtime_error("Stream creation timeout");
366  }
367  }
368 
369  // cork means pausing, cork = 0 means resuming
370  pa.pa_stream_cork(m_stream, 0, success_cb, this);
371  }
372 
373  bool running() const override
374  {
375  auto& pa = libpulse::instance();
376  return m_stream && pa.pa_stream_is_corked(m_stream);
377  }
378 
379  ~pulseaudio_engine() override
380  {
381  stop();
382 
383  auto& pa = libpulse::instance();
384  {
385  mainloop_locker lock{*this};
386  pa.pa_stream_unref(m_stream);
387  m_stream = nullptr;
388  pa.pa_context_unref(m_context);
389  m_context = nullptr;
390  }
391 
392  pa.pa_threaded_mainloop_stop(m_mainloop);
393 
394  pa.pa_threaded_mainloop_free(m_mainloop);
395  m_mainloop = nullptr;
396  m_api = nullptr;
397  m_stream = nullptr;
398  }
399 
400 private:
401  static int clearBuffers(float** float_output, unsigned long nframes, int outs)
402  {
403  for(int i = 0; i < outs; i++)
404  {
405  auto chan = float_output[i];
406  for(std::size_t j = 0; j < nframes; j++)
407  chan[j] = 0.f;
408  }
409 
410  return 0;
411  }
412 
413  static void success_cb(pa_stream*, int, void*) { }
414  static void output_callback(pa_stream* stream, size_t requested_bytes, void* userdata)
415  {
416  static const thread_local auto _ = [] {
417  ossia::set_thread_name("ossia audio 0");
418  return 0;
419  }();
420 
421  auto& self = *static_cast<pulseaudio_engine*>(userdata);
422  self.tick_start();
423 
424  auto& pa = libpulse::instance();
425  size_t bytes_to_fill = requested_bytes;
426  float* buffer = nullptr;
427 
428  auto clt = self.m_stream.load();
429  if(self.stop_processing || !clt)
430  {
431  self.tick_clear();
432  do
433  {
434  // Do nothing
435  if(auto res = pa.pa_stream_begin_write(stream, (void**)&buffer, &bytes_to_fill);
436  res != 0)
437  {
438  // we're in huge trouble
439  return;
440  }
441 
442  std::fill_n(buffer, bytes_to_fill / sizeof(float), 0);
443 
444  if(auto res = pa.pa_stream_write(
445  stream, buffer, bytes_to_fill, nullptr, 0LL, PA_SEEK_RELATIVE);
446  res != 0)
447  {
448  // we're in huge trouble
449  return;
450  }
451 
452  requested_bytes -= bytes_to_fill;
453  bytes_to_fill = requested_bytes;
454  } while(requested_bytes > 0);
455 
456  return;
457  }
458 
459  {
460 
461  do
462  {
463  auto res = pa.pa_stream_begin_write(stream, (void**)&buffer, &bytes_to_fill);
464  if(res != 0)
465  {
466  // we're in huge trouble
467  std::cerr << "no pa_stream_begin_write\n";
468  return;
469  }
470 
471  if(buffer)
472  {
473  const auto size = bytes_to_fill / (sizeof(float) * self.effective_outputs);
474  {
475  auto float_input = nullptr;
476  auto float_output = ((float*)buffer);
477 
478  float* float_outputs[2];
479  float_outputs[0] = (float*)alloca(sizeof(float) * size);
480  float_outputs[1] = (float*)alloca(sizeof(float) * size);
481 
482  pa_usec_t usec{};
483  pa.pa_stream_get_time(stream, &usec);
484 
485  ossia::audio_tick_state ts{float_input,
486  float_outputs,
487  (int)self.effective_inputs,
488  (int)self.effective_outputs,
489  size,
490  usec / 1e6};
491  self.audio_tick(ts);
492 
493  int k = 0;
494  for(std::size_t i = 0; i < size; i++)
495  {
496  float_output[k++] = float_outputs[0][i];
497  float_output[k++] = float_outputs[1][i];
498  }
499  }
500  }
501  else
502  {
503  std::cerr << "no buffer\n";
504  }
505 
506  if(auto res = pa.pa_stream_write(
507  stream, buffer, bytes_to_fill, nullptr, 0LL, PA_SEEK_RELATIVE);
508  res != 0)
509  {
510  // we're in huge trouble
511  return;
512  }
513 
514  requested_bytes -= bytes_to_fill;
515  bytes_to_fill = requested_bytes;
516  } while(requested_bytes > 0);
517  }
518 
519  self.tick_end();
520  }
521 
522  std::string m_name;
523  buffer_size_t m_frames{};
524 
525  pa_threaded_mainloop* m_mainloop{};
526  pa_mainloop_api* m_api{};
527  pa_context* m_context{};
528  std::atomic<pa_stream*> m_stream{};
529 };
530 }
531 
532 #endif
533 #endif
Definition: git_info.h:7