OSSIA
Open Scenario System for Interactive Application
websocket_log_sink.hpp
1 #pragma once
2 #include <ossia/detail/hash_map.hpp>
3 #include <ossia/detail/json.hpp>
4 #include <ossia/detail/nullable_variant.hpp>
5 #include <ossia/network/sockets/websocket_client.hpp>
6 
7 #include <spdlog/sinks/stdout_sinks.h>
8 #include <spdlog/spdlog.h>
9 
10 #include <readerwriterqueue.h>
11 
12 #include <atomic>
13 
14 namespace ossia
15 {
16 struct websocket_threaded_connection
17 {
18  websocket_threaded_connection(const std::string& ip)
19  : socket([](auto&&...) {})
20  {
21  running = true;
22  thread = std::thread([this, ip] {
23  auto log = spdlog::get("websocket");
24  if(!log)
25  log = spdlog::stderr_logger_mt("websocket");
26  try
27  {
28  while(running)
29  {
30  socket.connect_and_run(ip);
31  if(running)
32  {
33  // Try to reconnect
34  log->critical("Logger could not connect to {}.", ip);
35  std::this_thread::sleep_for(std::chrono::seconds(1));
36  }
37  }
38  log->critical("Logger stopping.");
39  }
40  catch(const websocketpp::exception& e)
41  {
42  log->critical("Logger error: ", e.what());
43  }
44  catch(const std::exception& e)
45  {
46  log->critical("Logger error: ", e.what());
47  }
48  catch(...)
49  {
50  log->critical("Logger error");
51  }
52  });
53  }
54 
55  ~websocket_threaded_connection()
56  {
57  running = false;
58  if(!socket.after_connect())
59  std::this_thread::sleep_for(std::chrono::milliseconds(500));
60  socket.stop();
61  if(thread.joinable())
62  thread.join();
63  }
64 
66  std::atomic_bool running{};
67  std::thread thread;
68 };
69 
71 struct websocket_log_sink final
72  : public spdlog::sinks::sink
73  , public Nano::Observer
74 {
75  websocket_log_sink(std::shared_ptr<websocket_threaded_connection> s, std::string send)
76  : socket{std::move(s)}
77  , sender{std::move(send)}
78  {
79  socket->socket.on_open.connect<&websocket_log_sink::open_fun>(this);
80  }
81 
82  void open_fun()
83  {
84  std::string m;
85  while(logs.try_dequeue(m))
86  {
87  socket->socket.send_message(m);
88  }
89  }
90 
91  ~websocket_log_sink() override
92  {
93  socket->socket.on_open.disconnect<&websocket_log_sink::open_fun>(this);
94  }
95 
96  void make_message(const spdlog::details::log_msg& msg)
97  {
98  buffer.Clear();
99 
100  ossia::json_writer writer{buffer};
101 
102  writer.StartObject();
103 
104  writer.Key("operation");
105  writer.String("log");
106 
107  writer.Key("level");
108  switch(msg.level)
109  {
110  case spdlog::level::trace:
111  writer.String("trace");
112  break;
113  case spdlog::level::debug:
114  writer.String("debug");
115  break;
116  case spdlog::level::info:
117  writer.String("info");
118  break;
119  case spdlog::level::warn:
120  writer.String("warn");
121  break;
122  case spdlog::level::err:
123  writer.String("error");
124  break;
125  case spdlog::level::critical:
126  writer.String("critical");
127  break;
128  case spdlog::level::off:
129  writer.String("off");
130  break;
131  }
132 
133  writer.Key("sender");
134  writer.String(sender.data(), sender.size());
135 
136  writer.Key("message");
137  writer.String(msg.payload.data(), msg.payload.size());
138 
139  writer.EndObject();
140  }
141 
142  void send_message(const spdlog::details::log_msg& msg)
143  {
144  make_message(msg);
145  socket->socket.send_message(buffer);
146  }
147 
148  void log(const spdlog::details::log_msg& msg) override
149  {
150  if(!socket->socket.connected())
151  {
152  make_message(msg);
153  logs.enqueue(std::string{buffer.GetString(), buffer.GetSize()});
154  return;
155  }
156  else
157  {
158  send_message(msg);
159  }
160  }
161 
162  void flush() override { }
163 
164  void set_pattern(const std::string& pattern) override { }
165  void set_formatter(std::unique_ptr<spdlog::formatter> sink_formatter) override { }
166 
167 private:
168  rapidjson::StringBuffer buffer;
169  std::shared_ptr<websocket_threaded_connection> socket;
170  std::string sender;
171 
172  moodycamel::ReaderWriterQueue<std::string> logs;
173 };
174 
176 struct websocket_heartbeat : public Nano::Observer
177 {
178 public:
180  std::shared_ptr<websocket_threaded_connection> t, std::string s,
181  std::chrono::seconds dur)
182  : interval{dur}
183  , sender{s}
184  , conn{t}
185  {
186  thread = std::thread([this] {
187  while(running)
188  {
189  if(init && conn->socket.connected())
190  {
191  buffer.Clear();
192  ossia::json_writer writer{buffer};
193  writer.StartObject();
194 
195  writer.Key("operation");
196  writer.String("alive");
197 
198  writer.Key("sender");
199  writer.String(sender.data(), sender.size());
200 
201  writer.EndObject();
202 
203  conn->socket.send_message(buffer);
204  }
205 
206  for(int i = 0; i < 100; i++)
207  {
208  std::this_thread::sleep_for(
209  std::chrono::duration_cast<std::chrono::milliseconds>(interval) / 100.0);
210  if(!running)
211  return;
212  }
213  }
214  });
215 
216  t->socket.on_open.connect<&websocket_heartbeat::open_fun>(*this);
217  }
218 
219  void open_fun()
220  {
221  conn->socket.send_message(init_msg);
222  init = true;
223  }
224 
225  ~websocket_heartbeat()
226  {
227  conn->socket.on_open.disconnect<&websocket_heartbeat::open_fun>(*this);
228  running = false;
229  if(thread.joinable())
230  thread.join();
231  }
232 
233  void
234  send_init(const ossia::hash_map<std::string, ossia::variant<std::string, int>>& map)
235  {
236  rapidjson::StringBuffer buffer;
237  ossia::json_writer writer{buffer};
238  writer.StartObject();
239 
240  writer.Key("operation");
241  writer.String("initWatchdog");
242 
243  writer.Key("sender");
244  writer.String(sender.data(), sender.size());
245 
246  writer.Key("aliveTime");
247  writer.Int(interval.count());
248 
249  struct
250  {
251  ossia::json_writer& writer;
252  void operator()(const std::string& s) { writer.String(s); }
253 
254  void operator()(int s) { writer.Int(s); }
255  } sw{writer};
256 
257  for(const auto& pair : map)
258  {
259  writer.Key(pair.first);
260  ossia::visit(sw, pair.second);
261  }
262 
263  writer.EndObject();
264 
265  if(conn->socket.connected())
266  {
267  conn->socket.send_message(buffer);
268  init = true;
269  }
270  else
271  {
272  init_msg = std::string(buffer.GetString(), buffer.GetSize());
273  }
274  }
275 
276 private:
277  rapidjson::StringBuffer buffer;
278  std::thread thread;
279  std::chrono::seconds interval;
280  std::string sender;
281  std::string init_msg;
282  std::shared_ptr<websocket_threaded_connection> conn;
283  std::atomic_bool running{true};
284  std::atomic_bool init{false};
285 };
286 }
Low-level Websocket client.
Definition: websocket_client.hpp:18
Definition: git_info.h:7
Sends websocket "alive" messages at regular intervals.
Definition: websocket_log_sink.hpp:177
A sink to use with spdlog, that will send its log messages over websockets.
Definition: websocket_log_sink.hpp:74