blob: f58910f0f1404b3b63e3161a78d96deab80a625e [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040019#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040020#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040037#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040038
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040039namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040040static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
41static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
42
43using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040044using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040045
46struct ConnectionInfo
47{
48 ~ConnectionInfo()
49 {
50 if (socket_)
51 socket_->join();
52 }
53
54 std::mutex mutex_ {};
55 bool responseReceived_ {false};
56 PeerConnectionRequest response_ {};
57 std::unique_ptr<IceTransport> ice_ {nullptr};
58 // Used to store currently non ready TLS Socket
59 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
60 std::shared_ptr<MultiplexedSocket> socket_ {};
61 std::set<CallbackId> cbIds_ {};
62
63 std::function<void(bool)> onConnected_;
64 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
65};
66
67/**
68 * returns whether or not UPnP is enabled and active_
69 * ie: if it is able to make port mappings
70 */
71bool
72ConnectionManager::Config::getUPnPActive() const
73{
74 if (upnpCtrl)
75 return upnpCtrl->isReady();
76 return false;
77}
78
79class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
80{
81public:
82 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
83 : config_ {std::move(config_)}
84 {}
85 ~Impl() {}
86
87 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
88 const dht::crypto::Identity& identity() const { return config_->id; }
89
90 void removeUnusedConnections(const DeviceId& deviceId = {})
91 {
92 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
93
94 {
95 std::lock_guard<std::mutex> lk(infosMtx_);
96 for (auto it = infos_.begin(); it != infos_.end();) {
97 auto& [key, info] = *it;
98 if (info && (!deviceId || key.first == deviceId)) {
99 unused.emplace_back(std::move(info));
100 it = infos_.erase(it);
101 } else {
102 ++it;
103 }
104 }
105 }
106 for (auto& info: unused) {
107 if (info->tls_)
108 info->tls_->shutdown();
109 if (info->socket_)
110 info->socket_->shutdown();
111 if (info->waitForAnswer_)
112 info->waitForAnswer_->cancel();
113 }
114 if (!unused.empty())
115 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
116 }
117
118 void shutdown()
119 {
120 if (isDestroying_.exchange(true))
121 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400122 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400123 {
124 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400125 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400126 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400127 for (auto& [deviceId, pcbs] : po) {
128 for (auto& [id, pending] : pcbs.connecting)
129 pending.cb(nullptr, deviceId);
130 for (auto& [id, pending] : pcbs.waiting)
131 pending.cb(nullptr, deviceId);
132 }
133
Adrien Béraud612b55b2023-05-29 10:42:04 -0400134 removeUnusedConnections();
135 }
136
Adrien Béraud612b55b2023-05-29 10:42:04 -0400137 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
138 const dht::Value::Id& vid,
139 const std::string& connType,
140 std::function<void(bool)> onConnected);
141 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
142 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
143 const std::string& name,
144 const dht::Value::Id& vid,
145 const std::shared_ptr<dht::crypto::Certificate>& cert);
146 void connectDevice(const DeviceId& deviceId,
147 const std::string& uri,
148 ConnectCallback cb,
149 bool noNewSocket = false,
150 bool forceNewSocket = false,
151 const std::string& connType = "");
152 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
153 const std::string& name,
154 ConnectCallback cb,
155 bool noNewSocket = false,
156 bool forceNewSocket = false,
157 const std::string& connType = "");
158 /**
159 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
160 * @param sock socket used to send the request
161 * @param name channel's name
162 * @param vid channel's id
163 * @param deviceId to identify the linked ConnectCallback
164 */
165 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
166 const std::string& name,
167 const DeviceId& deviceId,
168 const dht::Value::Id& vid);
169 /**
170 * Triggered when a PeerConnectionRequest comes from the DHT
171 */
172 void answerTo(IceTransport& ice,
173 const dht::Value::Id& id,
174 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
175 bool onRequestStartIce(const PeerConnectionRequest& req);
176 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
177 void onDhtPeerRequest(const PeerConnectionRequest& req,
178 const std::shared_ptr<dht::crypto::Certificate>& cert);
179
180 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
181 void onPeerResponse(const PeerConnectionRequest& req);
182 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
183
184 const std::shared_future<tls::DhParams> dhParams() const;
185 tls::CertificateStore& certStore() const { return *config_->certStore; }
186
187 mutable std::mutex messageMutex_ {};
188 std::set<std::string, std::less<>> treatedMessages_ {};
189
190 void loadTreatedMessages();
191 void saveTreatedMessages() const;
192
193 /// \return true if the given DHT message identifier has been treated
194 /// \note if message has not been treated yet this method st/ore this id and returns true at
195 /// further calls
196 bool isMessageTreated(std::string_view id);
197
198 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
199
200 /**
201 * Published IPv4/IPv6 addresses, used only if defined by the user in account
202 * configuration
203 *
204 */
205 IpAddr publishedIp_[2] {};
206
Adrien Béraud612b55b2023-05-29 10:42:04 -0400207 /**
208 * interface name on which this account is bound
209 */
210 std::string interface_ {"default"};
211
212 /**
213 * Get the local interface name on which this account is bound.
214 */
215 const std::string& getLocalInterface() const { return interface_; }
216
217 /**
218 * Get the published IP address, fallbacks to NAT if family is unspecified
219 * Prefers the usage of IPv4 if possible.
220 */
221 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
222
223 /**
224 * Set published IP address according to given family
225 */
226 void setPublishedAddress(const IpAddr& ip_addr);
227
228 /**
229 * Store the local/public addresses used to register
230 */
231 void storeActiveIpAddress(std::function<void()>&& cb = {});
232
233 /**
234 * Create and return ICE options.
235 */
236 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
237 IceTransportOptions getIceOptions() const noexcept;
238
239 /**
240 * Inform that a potential peer device have been found.
241 * Returns true only if the device certificate is a valid device certificate.
242 * In that case (true is returned) the account_id parameter is set to the peer account ID.
243 */
244 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
245 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
246
247 bool findCertificate(const dht::PkId& id,
248 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
249
250 /**
251 * returns whether or not UPnP is enabled and active
252 * ie: if it is able to make port mappings
253 */
254 bool getUPnPActive() const;
255
256 /**
257 * Triggered when a new TLS socket is ready to use
258 * @param ok If succeed
259 * @param deviceId Related device
260 * @param vid vid of the connection request
261 * @param name non empty if TLS was created by connectDevice()
262 */
263 void onTlsNegotiationDone(bool ok,
264 const DeviceId& deviceId,
265 const dht::Value::Id& vid,
266 const std::string& name = "");
267
268 std::shared_ptr<ConnectionManager::Config> config_;
269
270 IceTransportFactory iceFactory_ {};
271
272 mutable std::mt19937_64 rand;
273
274 iOSConnectedCallback iOSConnectedCb_ {};
275
276 std::mutex infosMtx_ {};
277 // Note: Someone can ask multiple sockets, so to avoid any race condition,
278 // each device can have multiple multiplexed sockets.
279 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
280
281 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
282 {
283 std::lock_guard<std::mutex> lk(infosMtx_);
284 auto it = infos_.find({deviceId, id});
285 if (it != infos_.end())
286 return it->second;
287 return {};
288 }
289
290 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
291 {
292 std::lock_guard<std::mutex> lk(infosMtx_);
293 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
294 auto& [key, value] = item;
295 return key.first == deviceId && value && value->socket_;
296 });
297 if (it != infos_.end())
298 return it->second;
299 return {};
300 }
301
302 ChannelRequestCallback channelReqCb_ {};
303 ConnectionReadyCallback connReadyCb_ {};
304 onICERequestCallback iceReqCb_ {};
305
306 /**
307 * Stores callback from connectDevice
308 * @note: each device needs a vector because several connectDevice can
309 * be done in parallel and we only want one socket
310 */
311 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400312
Adrien Béraud665294f2023-06-13 18:09:11 -0400313 struct PendingCb
314 {
315 std::string name;
316 ConnectCallback cb;
317 };
318 struct PendingOperations {
319 std::map<dht::Value::Id, PendingCb> connecting;
320 std::map<dht::Value::Id, PendingCb> waiting;
321 };
322
323 std::map<DeviceId, PendingOperations> pendingOperations_ {};
324
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400325 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400326 {
327 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400328 std::unique_lock<std::mutex> lk(connectCbsMtx_);
329 auto it = pendingOperations_.find(deviceId);
330 if (it == pendingOperations_.end())
331 return;
332 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400333 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400334 // Extract all pending callbacks
335 for (auto& [vid, cb] : pendingOperations.connecting)
336 ret.emplace_back(std::move(cb));
337 pendingOperations.connecting.clear();
338 for (auto& [vid, cb] : pendingOperations.waiting)
339 ret.emplace_back(std::move(cb));
340 pendingOperations.waiting.clear();
341 } else if (auto n = pendingOperations.waiting.extract(vid)) {
342 // If it's a waiting operation, just move it
343 ret.emplace_back(std::move(n.mapped()));
344 } else if (auto n = pendingOperations.connecting.extract(vid)) {
345 ret.emplace_back(std::move(n.mapped()));
346 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400347 // If accepted is false, it means that underlying socket is ok, but channel is declined
348 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400349 for (auto& [vid, cb] : pendingOperations.waiting)
350 ret.emplace_back(std::move(cb));
351 pendingOperations.waiting.clear();
352 for (auto& [vid, cb] : pendingOperations.connecting)
353 ret.emplace_back(std::move(cb));
354 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400355 }
356 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400357 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
358 pendingOperations_.erase(it);
359 lk.unlock();
360 for (auto& cb : ret)
361 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400362 }
363
Adrien Béraud665294f2023-06-13 18:09:11 -0400364 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400365 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400366 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400367 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400368 auto it = pendingOperations_.find(deviceId);
369 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400370 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 auto& pendingOp = it->second;
372 for (const auto& [id, pc]: pendingOp.connecting) {
373 if (vid == 0 || id == vid)
374 ret[id] = pc.name;
375 }
376 for (const auto& [id, pc]: pendingOp.waiting) {
377 if (vid == 0 || id == vid)
378 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400379 }
380 return ret;
381 }
382
383 std::shared_ptr<ConnectionManager::Impl> shared()
384 {
385 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
386 }
387 std::shared_ptr<ConnectionManager::Impl const> shared() const
388 {
389 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
390 }
391 std::weak_ptr<ConnectionManager::Impl> weak()
392 {
393 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
394 }
395 std::weak_ptr<ConnectionManager::Impl const> weak() const
396 {
397 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
398 }
399
400 std::atomic_bool isDestroying_ {false};
401};
402
403void
404ConnectionManager::Impl::connectDeviceStartIce(
405 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
406 const dht::Value::Id& vid,
407 const std::string& connType,
408 std::function<void(bool)> onConnected)
409{
410 auto deviceId = devicePk->getLongId();
411 auto info = getInfo(deviceId, vid);
412 if (!info) {
413 onConnected(false);
414 return;
415 }
416
417 std::unique_lock<std::mutex> lk(info->mutex_);
418 auto& ice = info->ice_;
419
420 if (!ice) {
421 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400422 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400423 onConnected(false);
424 return;
425 }
426
427 auto iceAttributes = ice->getLocalAttributes();
428 std::ostringstream icemsg;
429 icemsg << iceAttributes.ufrag << "\n";
430 icemsg << iceAttributes.pwd << "\n";
431 for (const auto& addr : ice->getLocalCandidates(1)) {
432 icemsg << addr << "\n";
433 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400434 config_->logger->debug("[device {}] Added local ICE candidate {}", addr, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400435 }
436
437 // Prepare connection request as a DHT message
438 PeerConnectionRequest val;
439
440 val.id = vid; /* Random id for the message unicity */
441 val.ice_msg = icemsg.str();
442 val.connType = connType;
443
444 auto value = std::make_shared<dht::Value>(std::move(val));
445 value->user_type = "peer_request";
446
447 // Send connection request through DHT
448 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400449 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400450 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
451 + devicePk->getId().toString()),
452 devicePk,
453 value,
454 [l=config_->logger,deviceId](bool ok) {
455 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400456 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400457 deviceId,
458 (ok ? "ok" : "failed"));
459 });
460 // Wait for call to onResponse() operated by DHT
461 if (isDestroying_) {
462 onConnected(true); // This avoid to wait new negotiation when destroying
463 return;
464 }
465
466 info->onConnected_ = std::move(onConnected);
467 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
468 std::chrono::steady_clock::now()
469 + DHT_MSG_TIMEOUT);
470 info->waitForAnswer_->async_wait(
471 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
472}
473
474void
475ConnectionManager::Impl::onResponse(const asio::error_code& ec,
476 const DeviceId& deviceId,
477 const dht::Value::Id& vid)
478{
479 if (ec == asio::error::operation_aborted)
480 return;
481 auto info = getInfo(deviceId, vid);
482 if (!info)
483 return;
484
485 std::unique_lock<std::mutex> lk(info->mutex_);
486 auto& ice = info->ice_;
487 if (isDestroying_) {
488 info->onConnected_(true); // The destructor can wake a pending wait here.
489 return;
490 }
491 if (!info->responseReceived_) {
492 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400493 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400494 info->onConnected_(false);
495 return;
496 }
497
498 if (!info->ice_) {
499 info->onConnected_(false);
500 return;
501 }
502
503 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
504
505 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
506 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400507 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400508 info->onConnected_(false);
509 return;
510 }
511 info->onConnected_(true);
512}
513
514bool
515ConnectionManager::Impl::connectDeviceOnNegoDone(
516 const DeviceId& deviceId,
517 const std::string& name,
518 const dht::Value::Id& vid,
519 const std::shared_ptr<dht::crypto::Certificate>& cert)
520{
521 auto info = getInfo(deviceId, vid);
522 if (!info)
523 return false;
524
525 std::unique_lock<std::mutex> lk {info->mutex_};
526 if (info->waitForAnswer_) {
527 // Negotiation is done and connected, go to handshake
528 // and avoid any cancellation at this point.
529 info->waitForAnswer_->cancel();
530 }
531 auto& ice = info->ice_;
532 if (!ice || !ice->isRunning()) {
533 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400534 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400535 return false;
536 }
537
538 // Build socket
539 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
540 std::move(ice)),
541 true);
542
543 // Negotiate a TLS session
544 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400545 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400546 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
547 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400548 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400549 identity(),
550 dhParams(),
551 *cert);
552
553 info->tls_->setOnReady(
554 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
555 bool ok) {
556 if (auto shared = w.lock())
557 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
558 });
559 return true;
560}
561
562void
563ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
564 const std::string& name,
565 ConnectCallback cb,
566 bool noNewSocket,
567 bool forceNewSocket,
568 const std::string& connType)
569{
570 if (!dht()) {
571 cb(nullptr, deviceId);
572 return;
573 }
574 if (deviceId.toString() == identity().second->getLongId().toString()) {
575 cb(nullptr, deviceId);
576 return;
577 }
578 findCertificate(deviceId,
579 [w = weak(),
580 deviceId,
581 name,
582 cb = std::move(cb),
583 noNewSocket,
584 forceNewSocket,
585 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
586 if (!cert) {
587 if (auto shared = w.lock())
588 if (shared->config_->logger)
589 shared->config_->logger->error(
590 "No valid certificate found for device {}",
591 deviceId);
592 cb(nullptr, deviceId);
593 return;
594 }
595 if (auto shared = w.lock()) {
596 shared->connectDevice(cert,
597 name,
598 std::move(cb),
599 noNewSocket,
600 forceNewSocket,
601 connType);
602 } else
603 cb(nullptr, deviceId);
604 });
605}
606
607void
608ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
609 const std::string& name,
610 ConnectCallback cb,
611 bool noNewSocket,
612 bool forceNewSocket,
613 const std::string& connType)
614{
615 // Avoid dht operation in a DHT callback to avoid deadlocks
616 dht::ThreadPool::computation().run([w = weak(),
617 name = std::move(name),
618 cert = std::move(cert),
619 cb = std::move(cb),
620 noNewSocket,
621 forceNewSocket,
622 connType] {
623 auto devicePk = cert->getSharedPublicKey();
624 auto deviceId = devicePk->getLongId();
625 auto sthis = w.lock();
626 if (!sthis || sthis->isDestroying_) {
627 cb(nullptr, deviceId);
628 return;
629 }
630 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
631 auto isConnectingToDevice = false;
632 {
633 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400634 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
635 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400636 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400637 while (pendings.connecting.find(vid) != pendings.connecting.end()
638 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400639 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400640 }
641 }
642 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400643 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400644 // Save current request for sendChannelRequest.
645 // Note: do not return here, cause we can be in a state where first
646 // socket is negotiated and first channel is pending
647 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400648 if (isConnectingToDevice && !forceNewSocket)
649 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400650 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400651 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400652 }
653
654 // Check if already negotiated
655 CallbackId cbId(deviceId, vid);
656 if (auto info = sthis->getConnectedInfo(deviceId)) {
657 std::lock_guard<std::mutex> lk(info->mutex_);
658 if (info->socket_) {
659 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400660 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400661 info->cbIds_.emplace(cbId);
662 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
663 return;
664 }
665 }
666
667 if (isConnectingToDevice && !forceNewSocket) {
668 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400669 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400670 return;
671 }
672 if (noNewSocket) {
673 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400674 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400675 return;
676 }
677
678 // Note: used when the ice negotiation fails to erase
679 // all stored structures.
680 auto eraseInfo = [w, cbId] {
681 if (auto shared = w.lock()) {
682 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400683 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400684 std::lock_guard<std::mutex> lk(shared->infosMtx_);
685 shared->infos_.erase(cbId);
686 }
687 };
688
689 // If no socket exists, we need to initiate an ICE connection.
690 sthis->getIceOptions([w,
691 deviceId = std::move(deviceId),
692 devicePk = std::move(devicePk),
693 name = std::move(name),
694 cert = std::move(cert),
695 vid,
696 connType,
697 eraseInfo](auto&& ice_config) {
698 auto sthis = w.lock();
699 if (!sthis) {
700 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
701 return;
702 }
703 ice_config.tcpEnable = true;
704 ice_config.onInitDone = [w,
705 deviceId = std::move(deviceId),
706 devicePk = std::move(devicePk),
707 name = std::move(name),
708 cert = std::move(cert),
709 vid,
710 connType,
711 eraseInfo](bool ok) {
712 dht::ThreadPool::io().run([w = std::move(w),
713 devicePk = std::move(devicePk),
714 vid = std::move(vid),
715 eraseInfo,
716 connType, ok] {
717 auto sthis = w.lock();
718 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400719 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400720 if (!sthis || !ok) {
721 eraseInfo();
722 return;
723 }
724 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
725 if (!ok) {
726 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
727 }
728 });
729 });
730 };
731 ice_config.onNegoDone = [w,
732 deviceId,
733 name,
734 cert = std::move(cert),
735 vid,
736 eraseInfo](bool ok) {
737 dht::ThreadPool::io().run([w = std::move(w),
738 deviceId = std::move(deviceId),
739 name = std::move(name),
740 cert = std::move(cert),
741 vid = std::move(vid),
742 eraseInfo = std::move(eraseInfo),
743 ok] {
744 auto sthis = w.lock();
745 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400746 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400747 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
748 eraseInfo();
749 });
750 };
751
752 auto info = std::make_shared<ConnectionInfo>();
753 {
754 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
755 sthis->infos_[{deviceId, vid}] = info;
756 }
757 std::unique_lock<std::mutex> lk {info->mutex_};
758 ice_config.master = false;
759 ice_config.streamsCount = 1;
760 ice_config.compCountPerStream = 1;
761 info->ice_ = sthis->iceFactory_.createUTransport("");
762 if (!info->ice_) {
763 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400764 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400765 eraseInfo();
766 return;
767 }
768 // We need to detect any shutdown if the ice session is destroyed before going to the
769 // TLS session;
770 info->ice_->setOnShutdown([eraseInfo]() {
771 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
772 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400773 try {
774 info->ice_->initIceInstance(ice_config);
775 } catch (const std::exception& e) {
776 if (sthis->config_->logger)
777 sthis->config_->logger->error("{}", e.what());
778 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
779 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400780 });
781 });
782}
783
784void
785ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
786 const std::string& name,
787 const DeviceId& deviceId,
788 const dht::Value::Id& vid)
789{
790 auto channelSock = sock->addChannel(name);
791 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
792 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400793 if (auto shared = w.lock())
794 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400795 });
796 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400797 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400798 auto shared = w.lock();
799 auto channelSock = wSock.lock();
800 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400801 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400802 });
803
804 ChannelRequest val;
805 val.name = channelSock->name();
806 val.state = ChannelRequestState::REQUEST;
807 val.channel = channelSock->channel();
808 msgpack::sbuffer buffer(256);
809 msgpack::pack(buffer, val);
810
811 std::error_code ec;
812 int res = sock->write(CONTROL_CHANNEL,
813 reinterpret_cast<const uint8_t*>(buffer.data()),
814 buffer.size(),
815 ec);
816 if (res < 0) {
817 // TODO check if we should handle errors here
818 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400819 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400820 }
821}
822
823void
824ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
825{
826 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400827 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400828 if (config_->logger)
829 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400830 std::lock_guard<std::mutex> lk {info->mutex_};
831 info->responseReceived_ = true;
832 info->response_ = std::move(req);
833 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
834 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
835 this,
836 std::placeholders::_1,
837 device,
838 req.id));
839 } else {
840 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400841 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400842 }
843}
844
845void
846ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
847{
848 if (!dht())
849 return;
850 dht()->listen<PeerConnectionRequest>(
851 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
852 [w = weak()](PeerConnectionRequest&& req) {
853 auto shared = w.lock();
854 if (!shared)
855 return false;
856 if (shared->isMessageTreated(to_hex_string(req.id))) {
857 // Message already treated. Just ignore
858 return true;
859 }
860 if (req.isAnswer) {
861 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400862 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400863 } else {
864 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400865 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400866 }
867 if (req.isAnswer) {
868 shared->onPeerResponse(req);
869 } else {
870 // Async certificate checking
871 shared->dht()->findCertificate(
872 req.from,
873 [w, req = std::move(req)](
874 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
875 auto shared = w.lock();
876 if (!shared)
877 return;
878 dht::InfoHash peer_h;
879 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
880#if TARGET_OS_IOS
881 if (shared->iOSConnectedCb_(req.connType, peer_h))
882 return;
883#endif
884 shared->onDhtPeerRequest(req, cert);
885 } else {
886 if (shared->config_->logger)
887 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400888 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400889 req.owner->getLongId());
890 }
891 });
892 }
893
894 return true;
895 },
896 dht::Value::UserTypeFilter("peer_request"));
897}
898
899void
900ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
901 const DeviceId& deviceId,
902 const dht::Value::Id& vid,
903 const std::string& name)
904{
905 if (isDestroying_)
906 return;
907 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
908 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
909 // asked yet)
910 auto isDhtRequest = name.empty();
911 if (!ok) {
912 if (isDhtRequest) {
913 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400914 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400915 deviceId,
916 name,
917 vid);
918 if (connReadyCb_)
919 connReadyCb_(deviceId, "", nullptr);
920 } else {
921 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400922 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400923 deviceId,
924 name,
925 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400926 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400927 }
928 } else {
929 // The socket is ready, store it
930 if (isDhtRequest) {
931 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400932 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400933 deviceId,
934 vid);
935 } else {
936 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400937 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400938 deviceId,
939 name,
940 vid);
941 }
942
943 auto info = getInfo(deviceId, vid);
944 addNewMultiplexedSocket({deviceId, vid}, info);
945 // Finally, open the channel and launch pending callbacks
946 if (info->socket_) {
947 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400948 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400949 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400950 config_->logger->debug("[device {}] Send request on TLS socket for channel {} to {}",
951 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -0400952 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400953 }
954 }
955 }
956}
957
958void
959ConnectionManager::Impl::answerTo(IceTransport& ice,
960 const dht::Value::Id& id,
961 const std::shared_ptr<dht::crypto::PublicKey>& from)
962{
963 // NOTE: This is a shortest version of a real SDP message to save some bits
964 auto iceAttributes = ice.getLocalAttributes();
965 std::ostringstream icemsg;
966 icemsg << iceAttributes.ufrag << "\n";
967 icemsg << iceAttributes.pwd << "\n";
968 for (const auto& addr : ice.getLocalCandidates(1)) {
969 icemsg << addr << "\n";
970 }
971
972 // Send PeerConnection response
973 PeerConnectionRequest val;
974 val.id = id;
975 val.ice_msg = icemsg.str();
976 val.isAnswer = true;
977 auto value = std::make_shared<dht::Value>(std::move(val));
978 value->user_type = "peer_request";
979
980 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400981 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400982 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
983 + from->getId().toString()),
984 from,
985 value,
986 [from,l=config_->logger](bool ok) {
987 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400988 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400989 from->getLongId(),
990 (ok ? "ok" : "failed"));
991 });
992}
993
994bool
995ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
996{
997 auto deviceId = req.owner->getLongId();
998 auto info = getInfo(deviceId, req.id);
999 if (!info)
1000 return false;
1001
1002 std::unique_lock<std::mutex> lk {info->mutex_};
1003 auto& ice = info->ice_;
1004 if (!ice) {
1005 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001006 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001007 if (connReadyCb_)
1008 connReadyCb_(deviceId, "", nullptr);
1009 return false;
1010 }
1011
1012 auto sdp = ice->parseIceCandidates(req.ice_msg);
1013 answerTo(*ice, req.id, req.owner);
1014 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1015 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001016 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001017 ice = nullptr;
1018 if (connReadyCb_)
1019 connReadyCb_(deviceId, "", nullptr);
1020 return false;
1021 }
1022 return true;
1023}
1024
1025bool
1026ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1027{
1028 auto deviceId = req.owner->getLongId();
1029 auto info = getInfo(deviceId, req.id);
1030 if (!info)
1031 return false;
1032
1033 std::unique_lock<std::mutex> lk {info->mutex_};
1034 auto& ice = info->ice_;
1035 if (!ice) {
1036 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001037 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001038 return false;
1039 }
1040
1041 // Build socket
1042 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1043 std::move(ice)),
1044 false);
1045
1046 // init TLS session
1047 auto ph = req.from;
1048 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001049 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1050 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001051 req.id);
1052 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1053 std::move(endpoint),
1054 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001055 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001056 identity(),
1057 dhParams(),
1058 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1059 auto shared = w.lock();
1060 if (!shared)
1061 return false;
1062 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1063 if (!crt)
1064 return false;
1065 return crt->getPacked() == cert.getPacked();
1066 });
1067
1068 info->tls_->setOnReady(
1069 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1070 if (auto shared = w.lock())
1071 shared->onTlsNegotiationDone(ok, deviceId, vid);
1072 });
1073 return true;
1074}
1075
1076void
1077ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1078 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1079{
1080 auto deviceId = req.owner->getLongId();
1081 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001082 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001083 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1084 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001085 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001086 return;
1087 }
1088
1089 // Because the connection is accepted, create an ICE socket.
1090 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1091 auto shared = w.lock();
1092 if (!shared)
1093 return;
1094 // Note: used when the ice negotiation fails to erase
1095 // all stored structures.
1096 auto eraseInfo = [w, id = req.id, deviceId] {
1097 if (auto shared = w.lock()) {
1098 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001099 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001100 if (shared->connReadyCb_)
1101 shared->connReadyCb_(deviceId, "", nullptr);
1102 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1103 shared->infos_.erase({deviceId, id});
1104 }
1105 };
1106
1107 ice_config.tcpEnable = true;
1108 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1109 auto shared = w.lock();
1110 if (!shared)
1111 return;
1112 if (!ok) {
1113 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001114 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001115 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1116 return;
1117 }
1118
1119 dht::ThreadPool::io().run(
1120 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1121 auto shared = w.lock();
1122 if (!shared)
1123 return;
1124 if (!shared->onRequestStartIce(req))
1125 eraseInfo();
1126 });
1127 };
1128
1129 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1130 auto shared = w.lock();
1131 if (!shared)
1132 return;
1133 if (!ok) {
1134 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001135 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001136 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1137 return;
1138 }
1139
1140 dht::ThreadPool::io().run(
1141 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1142 if (auto shared = w.lock())
1143 if (!shared->onRequestOnNegoDone(req))
1144 eraseInfo();
1145 });
1146 };
1147
1148 // Negotiate a new ICE socket
1149 auto info = std::make_shared<ConnectionInfo>();
1150 {
1151 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1152 shared->infos_[{deviceId, req.id}] = info;
1153 }
1154 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001155 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001156 std::unique_lock<std::mutex> lk {info->mutex_};
1157 ice_config.streamsCount = 1;
1158 ice_config.compCountPerStream = 1; // TCP
1159 ice_config.master = true;
1160 info->ice_ = shared->iceFactory_.createUTransport("");
1161 if (not info->ice_) {
1162 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001163 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001164 eraseInfo();
1165 return;
1166 }
1167 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1168 info->ice_->setOnShutdown([eraseInfo]() {
1169 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1170 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001171 try {
1172 info->ice_->initIceInstance(ice_config);
1173 } catch (const std::exception& e) {
1174 if (shared->config_->logger)
1175 shared->config_->logger->error("{}", e.what());
1176 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1177 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001178 });
1179}
1180
1181void
1182ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1183{
1184 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1185 info->socket_->setOnReady(
1186 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1187 if (auto sthis = w.lock())
1188 if (sthis->connReadyCb_)
1189 sthis->connReadyCb_(deviceId, socket->name(), socket);
1190 });
1191 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1192 const uint16_t&,
1193 const std::string& name) {
1194 if (auto sthis = w.lock())
1195 if (sthis->channelReqCb_)
1196 return sthis->channelReqCb_(peer, name);
1197 return false;
1198 });
1199 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1200 // Cancel current outgoing connections
1201 dht::ThreadPool::io().run([w, deviceId, vid] {
1202 auto sthis = w.lock();
1203 if (!sthis)
1204 return;
1205
1206 std::set<CallbackId> ids;
1207 if (auto info = sthis->getInfo(deviceId, vid)) {
1208 std::lock_guard<std::mutex> lk(info->mutex_);
1209 if (info->socket_) {
1210 ids = std::move(info->cbIds_);
1211 info->socket_->shutdown();
1212 }
1213 }
1214 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001215 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001216
1217 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1218 sthis->infos_.erase({deviceId, vid});
1219 });
1220 });
1221}
1222
1223const std::shared_future<tls::DhParams>
1224ConnectionManager::Impl::dhParams() const
1225{
1226 return dht::ThreadPool::computation().get<tls::DhParams>(
1227 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1228 ;
1229}
1230
1231template<typename ID = dht::Value::Id>
1232std::set<ID, std::less<>>
1233loadIdList(const std::string& path)
1234{
1235 std::set<ID, std::less<>> ids;
1236 std::ifstream file = fileutils::ifstream(path);
1237 if (!file.is_open()) {
1238 //JAMI_DBG("Could not load %s", path.c_str());
1239 return ids;
1240 }
1241 std::string line;
1242 while (std::getline(file, line)) {
1243 if constexpr (std::is_same<ID, std::string>::value) {
1244 ids.emplace(std::move(line));
1245 } else if constexpr (std::is_integral<ID>::value) {
1246 ID vid;
1247 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1248 ec == std::errc()) {
1249 ids.emplace(vid);
1250 }
1251 }
1252 }
1253 return ids;
1254}
1255
1256template<typename List = std::set<dht::Value::Id>>
1257void
1258saveIdList(const std::string& path, const List& ids)
1259{
1260 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1261 if (!file.is_open()) {
1262 //JAMI_ERR("Could not save to %s", path.c_str());
1263 return;
1264 }
1265 for (auto& c : ids)
1266 file << std::hex << c << "\n";
1267}
1268
1269void
1270ConnectionManager::Impl::loadTreatedMessages()
1271{
1272 std::lock_guard<std::mutex> lock(messageMutex_);
1273 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1274 treatedMessages_ = loadIdList<std::string>(path);
1275 if (treatedMessages_.empty()) {
1276 auto messages = loadIdList(path);
1277 for (const auto& m : messages)
1278 treatedMessages_.emplace(to_hex_string(m));
1279 }
1280}
1281
1282void
1283ConnectionManager::Impl::saveTreatedMessages() const
1284{
1285 dht::ThreadPool::io().run([w = weak()]() {
1286 if (auto sthis = w.lock()) {
1287 auto& this_ = *sthis;
1288 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1289 fileutils::check_dir(this_.config_->cachePath.c_str());
1290 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1291 + DIR_SEPARATOR_STR "treatedMessages",
1292 this_.treatedMessages_);
1293 }
1294 });
1295}
1296
1297bool
1298ConnectionManager::Impl::isMessageTreated(std::string_view id)
1299{
1300 std::lock_guard<std::mutex> lock(messageMutex_);
1301 auto res = treatedMessages_.emplace(id);
1302 if (res.second) {
1303 saveTreatedMessages();
1304 return false;
1305 }
1306 return true;
1307}
1308
1309/**
1310 * returns whether or not UPnP is enabled and active_
1311 * ie: if it is able to make port mappings
1312 */
1313bool
1314ConnectionManager::Impl::getUPnPActive() const
1315{
1316 return config_->getUPnPActive();
1317}
1318
1319IpAddr
1320ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1321{
1322 if (family == AF_INET)
1323 return publishedIp_[0];
1324 if (family == AF_INET6)
1325 return publishedIp_[1];
1326
1327 assert(family == AF_UNSPEC);
1328
1329 // If family is not set, prefere IPv4 if available. It's more
1330 // likely to succeed behind NAT.
1331 if (publishedIp_[0])
1332 return publishedIp_[0];
1333 if (publishedIp_[1])
1334 return publishedIp_[1];
1335 return {};
1336}
1337
1338void
1339ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1340{
1341 if (ip_addr.getFamily() == AF_INET) {
1342 publishedIp_[0] = ip_addr;
1343 } else {
1344 publishedIp_[1] = ip_addr;
1345 }
1346}
1347
1348void
1349ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1350{
1351 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1352 bool hasIpv4 {false}, hasIpv6 {false};
1353 for (auto& result : results) {
1354 auto family = result.getFamily();
1355 if (family == AF_INET) {
1356 if (not hasIpv4) {
1357 hasIpv4 = true;
1358 if (config_->logger)
1359 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1360 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1361 setPublishedAddress(*result.get());
1362 if (config_->upnpCtrl) {
1363 config_->upnpCtrl->setPublicAddress(*result.get());
1364 }
1365 }
1366 } else if (family == AF_INET6) {
1367 if (not hasIpv6) {
1368 hasIpv6 = true;
1369 if (config_->logger)
1370 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1371 setPublishedAddress(*result.get());
1372 }
1373 }
1374 if (hasIpv4 and hasIpv6)
1375 break;
1376 }
1377 if (cb)
1378 cb();
1379 });
1380}
1381
1382void
1383ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1384{
1385 storeActiveIpAddress([this, cb = std::move(cb)] {
1386 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1387 auto publishedAddr = getPublishedIpAddress();
1388
1389 if (publishedAddr) {
1390 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1391 publishedAddr.getFamily());
1392 if (interfaceAddr) {
1393 opts.accountLocalAddr = interfaceAddr;
1394 opts.accountPublicAddr = publishedAddr;
1395 }
1396 }
1397 if (cb)
1398 cb(std::move(opts));
1399 });
1400}
1401
1402IceTransportOptions
1403ConnectionManager::Impl::getIceOptions() const noexcept
1404{
1405 IceTransportOptions opts;
Adrien Béraudf7081d32023-07-19 23:02:11 -04001406 opts.factory = (IceTransportFactory*)&iceFactory_;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001407 opts.upnpEnable = getUPnPActive();
1408
1409 if (config_->stunEnabled)
1410 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1411 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001412 if (config_->turnCache) {
1413 auto turnAddr = config_->turnCache->getResolvedTurn();
1414 if (turnAddr != std::nullopt) {
1415 opts.turnServers.emplace_back(TurnServerInfo()
1416 .setUri(turnAddr->toString())
1417 .setUsername(config_->turnServerUserName)
1418 .setPassword(config_->turnServerPwd)
1419 .setRealm(config_->turnServerRealm));
1420 }
1421 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001422 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001423 .setUri(config_->turnServer)
1424 .setUsername(config_->turnServerUserName)
1425 .setPassword(config_->turnServerPwd)
1426 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001427 }
1428 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1429 // co issues. So this needs some debug. for now just disable
1430 // if (cacheTurnV6 && *cacheTurnV6) {
1431 // opts.turnServers.emplace_back(TurnServerInfo()
1432 // .setUri(cacheTurnV6->toString(true))
1433 // .setUsername(turnServerUserName_)
1434 // .setPassword(turnServerPwd_)
1435 // .setRealm(turnServerRealm_));
1436 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001437 }
1438 return opts;
1439}
1440
1441bool
1442ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1443 dht::InfoHash& account_id,
1444 const std::shared_ptr<Logger>& logger)
1445{
1446 if (not crt)
1447 return false;
1448
1449 auto top_issuer = crt;
1450 while (top_issuer->issuer)
1451 top_issuer = top_issuer->issuer;
1452
1453 // Device certificate can't be self-signed
1454 if (top_issuer == crt) {
1455 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001456 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001457 return false;
1458 }
1459
1460 // Check peer certificate chain
1461 // Trust store with top issuer as the only CA
1462 dht::crypto::TrustList peer_trust;
1463 peer_trust.add(*top_issuer);
1464 if (not peer_trust.verify(*crt)) {
1465 if (logger)
1466 logger->warn("Found invalid peer device: {}", crt->getLongId());
1467 return false;
1468 }
1469
1470 // Check cached OCSP response
1471 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1472 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001473 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001474 return false;
1475 }
1476
1477 account_id = crt->issuer->getId();
1478 if (logger)
1479 logger->warn("Found peer device: {} account:{} CA:{}",
1480 crt->getLongId(),
1481 account_id,
1482 top_issuer->getId());
1483 return true;
1484}
1485
1486bool
1487ConnectionManager::Impl::findCertificate(
1488 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1489{
1490 if (auto cert = certStore().getCertificate(id.toString())) {
1491 if (cb)
1492 cb(cert);
1493 } else if (cb)
1494 cb(nullptr);
1495 return true;
1496}
1497
1498ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1499 : pimpl_ {std::make_shared<Impl>(config_)}
1500{}
1501
1502ConnectionManager::~ConnectionManager()
1503{
1504 if (pimpl_)
1505 pimpl_->shutdown();
1506}
1507
1508void
1509ConnectionManager::connectDevice(const DeviceId& deviceId,
1510 const std::string& name,
1511 ConnectCallback cb,
1512 bool noNewSocket,
1513 bool forceNewSocket,
1514 const std::string& connType)
1515{
1516 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1517}
1518
1519void
1520ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1521 const std::string& name,
1522 ConnectCallback cb,
1523 bool noNewSocket,
1524 bool forceNewSocket,
1525 const std::string& connType)
1526{
1527 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1528}
1529
1530bool
1531ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1532{
Adrien Béraud665294f2023-06-13 18:09:11 -04001533 auto pending = pimpl_->getPendingIds(deviceId);
1534 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001535 != pending.end();
1536}
1537
1538void
1539ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1540{
1541 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1542 std::set<DeviceId> peersDevices;
1543 {
1544 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1545 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1546 auto const& [key, value] = *iter;
1547 auto deviceId = key.first;
1548 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1549 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1550 connInfos.emplace_back(value);
1551 peersDevices.emplace(deviceId);
1552 iter = pimpl_->infos_.erase(iter);
1553 } else {
1554 iter++;
1555 }
1556 }
1557 }
1558 // Stop connections to all peers devices
1559 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001560 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001561 // This will close the TLS Session
1562 pimpl_->removeUnusedConnections(deviceId);
1563 }
1564 for (auto& info : connInfos) {
1565 if (info->socket_)
1566 info->socket_->shutdown();
1567 if (info->waitForAnswer_)
1568 info->waitForAnswer_->cancel();
1569 if (info->ice_) {
1570 std::unique_lock<std::mutex> lk {info->mutex_};
1571 dht::ThreadPool::io().run(
1572 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1573 }
1574 }
1575}
1576
1577void
1578ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1579{
1580 pimpl_->onDhtConnected(devicePk);
1581}
1582
1583void
1584ConnectionManager::onICERequest(onICERequestCallback&& cb)
1585{
1586 pimpl_->iceReqCb_ = std::move(cb);
1587}
1588
1589void
1590ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1591{
1592 pimpl_->channelReqCb_ = std::move(cb);
1593}
1594
1595void
1596ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1597{
1598 pimpl_->connReadyCb_ = std::move(cb);
1599}
1600
1601void
1602ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1603{
1604 pimpl_->iOSConnectedCb_ = std::move(cb);
1605}
1606
1607std::size_t
1608ConnectionManager::activeSockets() const
1609{
1610 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1611 return pimpl_->infos_.size();
1612}
1613
1614void
1615ConnectionManager::monitor() const
1616{
1617 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1618 auto logger = pimpl_->config_->logger;
1619 if (!logger)
1620 return;
1621 logger->debug("ConnectionManager current status:");
1622 for (const auto& [_, ci] : pimpl_->infos_) {
1623 if (ci->socket_)
1624 ci->socket_->monitor();
1625 }
1626 logger->debug("ConnectionManager end status.");
1627}
1628
1629void
1630ConnectionManager::connectivityChanged()
1631{
1632 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1633 for (const auto& [_, ci] : pimpl_->infos_) {
1634 if (ci->socket_)
1635 ci->socket_->sendBeacon();
1636 }
1637}
1638
1639void
1640ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1641{
1642 return pimpl_->getIceOptions(std::move(cb));
1643}
1644
1645IceTransportOptions
1646ConnectionManager::getIceOptions() const noexcept
1647{
1648 return pimpl_->getIceOptions();
1649}
1650
1651IpAddr
1652ConnectionManager::getPublishedIpAddress(uint16_t family) const
1653{
1654 return pimpl_->getPublishedIpAddress(family);
1655}
1656
1657void
1658ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1659{
1660 return pimpl_->setPublishedAddress(ip_addr);
1661}
1662
1663void
1664ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1665{
1666 return pimpl_->storeActiveIpAddress(std::move(cb));
1667}
1668
1669std::shared_ptr<ConnectionManager::Config>
1670ConnectionManager::getConfig()
1671{
1672 return pimpl_->config_;
1673}
1674
Sébastien Blin464bdff2023-07-19 08:02:53 -04001675} // namespace dhtnet