blob: f425ef2e7440eabb76160c9a2ebf74645b17f42b [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud75754b22023-10-17 09:16:06 -040043
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Adrien Béraud75754b22023-10-17 09:16:06 -040050std::pair<dhtnet::DeviceId, dht::Value::Id> parseCallbackId(std::string_view ci)
Amna31791e52023-08-03 12:40:57 -040051{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
Adrien Béraud75754b22023-10-17 09:16:06 -040058 return {deviceId, vid};
Amna31791e52023-08-03 12:40:57 -040059}
Amna81221ad2023-09-14 17:33:26 -040060
61std::shared_ptr<ConnectionManager::Config>
62createConfig(std::shared_ptr<ConnectionManager::Config> config_)
63{
64 if (!config_->certStore){
65 config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger);
66 }
67 if (!config_->dht) {
68 dht::DhtRunner::Config dhtConfig;
69 dhtConfig.dht_config.id = config_->id;
70 dhtConfig.threaded = true;
71 dht::DhtRunner::Context dhtContext;
72 dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) {
73 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
74 if (auto cert = c->getCertificate(pk_id.toString()))
75 ret.emplace_back(std::move(cert));
76 return ret;
77 };
78 config_->dht = std::make_shared<dht::DhtRunner>();
79 config_->dht->run(dhtConfig, std::move(dhtContext));
80 config_->dht->bootstrap("bootstrap.jami.net");
81 }
82 if (!config_->factory){
83 config_->factory = std::make_shared<IceTransportFactory>(config_->logger);
84 }
85 return config_;
86}
87
Adrien Béraud612b55b2023-05-29 10:42:04 -040088struct ConnectionInfo
89{
90 ~ConnectionInfo()
91 {
92 if (socket_)
93 socket_->join();
94 }
95
96 std::mutex mutex_ {};
97 bool responseReceived_ {false};
98 PeerConnectionRequest response_ {};
99 std::unique_ptr<IceTransport> ice_ {nullptr};
100 // Used to store currently non ready TLS Socket
101 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
102 std::shared_ptr<MultiplexedSocket> socket_ {};
Adrien Béraud75754b22023-10-17 09:16:06 -0400103 std::set<dht::Value::Id> cbIds_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400104
105 std::function<void(bool)> onConnected_;
106 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
Adrien Béraud75754b22023-10-17 09:16:06 -0400107
108 void shutdown() {
109 std::lock_guard<std::mutex> lk(mutex_);
110 if (tls_)
111 tls_->shutdown();
112 if (socket_)
113 socket_->shutdown();
114 if (waitForAnswer_)
115 waitForAnswer_->cancel();
116 if (ice_) {
117 dht::ThreadPool::io().run(
118 [ice = std::shared_ptr<IceTransport>(std::move(ice_))] {});
119 }
120 }
121
122 std::map<std::string, std::string>
123 getInfo(const DeviceId& deviceId, dht::Value::Id valueId, tls::CertificateStore& certStore) const
124 {
125 std::map<std::string, std::string> connectionInfo;
126 connectionInfo["id"] = callbackIdToString(deviceId, valueId);
127 connectionInfo["device"] = deviceId.toString();
128 auto cert = tls_ ? tls_->peerCertificate() : (socket_ ? socket_->peerCertificate() : nullptr);
129 if (not cert)
130 cert = certStore.getCertificate(deviceId.toString());
131 if (cert) {
132 connectionInfo["peer"] = cert->issuer->getId().toString();
133 }
134 if (socket_) {
135 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
136 connectionInfo["remoteAddress"] = socket_->getRemoteAddress();
137 } else if (tls_) {
138 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
139 connectionInfo["remoteAddress"] = tls_->getRemoteAddress();
140 } else if(ice_) {
141 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
142 connectionInfo["remoteAddress"] = ice_->getRemoteAddress(ICE_COMP_ID_SIP_TRANSPORT);
143 }
144 return connectionInfo;
145 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400146};
147
Adrien Béraud75754b22023-10-17 09:16:06 -0400148struct PendingCb {
149 std::string name;
150 ConnectCallback cb;
Adrien Béraudb941e922023-10-16 12:56:14 -0400151 bool requested {false};
Adrien Béraud75754b22023-10-17 09:16:06 -0400152};
153
154struct DeviceInfo {
155 const DeviceId deviceId;
156 mutable std::mutex mtx_ {};
157 std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info;
158 std::map<dht::Value::Id, PendingCb> connecting;
159 std::map<dht::Value::Id, PendingCb> waiting;
160 DeviceInfo(DeviceId id) : deviceId {id} {}
161
162 inline bool isConnecting() const {
163 return !connecting.empty() || !waiting.empty();
164 }
165
166 inline bool empty() const {
167 return info.empty() && connecting.empty() && waiting.empty();
168 }
169
170 dht::Value::Id newId(std::mt19937_64& rand) const {
171 ValueIdDist dist(1, ID_MAX_VAL);
172 dht::Value::Id id;
173 do {
174 id = dist(rand);
175 } while (info.find(id) != info.end()
176 || connecting.find(id) != connecting.end()
177 || waiting.find(id) != waiting.end());
178 return id;
179 }
180
181 std::shared_ptr<ConnectionInfo> getConnectedInfo() const {
182 for (auto& [id, ci] : info) {
183 if (ci->socket_)
184 return ci;
185 }
186 return {};
187 }
188
189 std::vector<PendingCb> extractPendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
190 {
191 std::vector<PendingCb> ret;
192 if (vid == 0) {
193 // Extract all pending callbacks
194 ret.reserve(connecting.size() + waiting.size());
195 for (auto& [vid, cb] : connecting)
196 ret.emplace_back(std::move(cb));
197 connecting.clear();
198 for (auto& [vid, cb] : waiting)
199 ret.emplace_back(std::move(cb));
200 waiting.clear();
201 } else if (auto n = waiting.extract(vid)) {
202 // If it's a waiting operation, just move it
203 ret.emplace_back(std::move(n.mapped()));
204 } else if (auto n = connecting.extract(vid)) {
205 ret.emplace_back(std::move(n.mapped()));
206 // If sock is nullptr, execute if it's the last connecting operation
207 // If accepted is false, it means that underlying socket is ok, but channel is declined
208 if (!sock && connecting.empty() && accepted) {
209 for (auto& [vid, cb] : waiting)
210 ret.emplace_back(std::move(cb));
211 waiting.clear();
212 for (auto& [vid, cb] : connecting)
213 ret.emplace_back(std::move(cb));
214 connecting.clear();
215 }
216 }
217 return ret;
218 }
219
220 std::vector<std::shared_ptr<ConnectionInfo>> extractUnusedConnections() {
221 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
222 for (auto& [id, info] : info)
223 unused.emplace_back(std::move(info));
224 info.clear();
225 return unused;
226 }
227
228 void executePendingOperations(std::unique_lock<std::mutex>& lock, dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) {
229 auto ops = extractPendingOperations(vid, sock, accepted);
230 lock.unlock();
231 for (auto& cb : ops)
232 cb.cb(sock, deviceId);
233 }
234 void executePendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) {
235 std::unique_lock<std::mutex> lock(mtx_);
236 executePendingOperations(lock, vid, sock, accepted);
237 }
238
Adrien Béraudb941e922023-10-16 12:56:14 -0400239 bool isConnecting(const std::string& name) const {
Adrien Béraud75754b22023-10-17 09:16:06 -0400240 for (const auto& [id, pc]: connecting)
Adrien Béraudb941e922023-10-16 12:56:14 -0400241 if (pc.name == name)
242 return true;
Adrien Béraud75754b22023-10-17 09:16:06 -0400243 for (const auto& [id, pc]: waiting)
Adrien Béraudb941e922023-10-16 12:56:14 -0400244 if (pc.name == name)
245 return true;
246 return false;
247 }
248 std::map<dht::Value::Id, std::string> requestPendingOps() {
249 std::map<dht::Value::Id, std::string> ret;
250 for (auto& [id, pc]: connecting) {
251 if (!pc.requested) {
252 ret[id] = pc.name;
253 pc.requested = true;
254 }
255 }
256 for (auto& [id, pc]: waiting) {
257 if (!pc.requested) {
258 ret[id] = pc.name;
259 pc.requested = true;
260 }
261 }
Adrien Béraud75754b22023-10-17 09:16:06 -0400262 return ret;
263 }
264
265 std::vector<std::map<std::string, std::string>>
266 getConnectionList(tls::CertificateStore& certStore) const {
267 std::lock_guard<std::mutex> lk(mtx_);
268 std::vector<std::map<std::string, std::string>> ret;
Adrien Béraudd5ec7a82023-10-28 18:07:03 -0400269 ret.reserve(info.size() + connecting.size() + waiting.size());
Adrien Béraud75754b22023-10-17 09:16:06 -0400270 for (auto& [id, ci] : info) {
271 std::lock_guard<std::mutex> lk(ci->mutex_);
272 ret.emplace_back(ci->getInfo(deviceId, id, certStore));
273 }
274 auto cert = certStore.getCertificate(deviceId.toString());
275 for (const auto& [vid, ci] : connecting) {
276 ret.emplace_back(std::map<std::string, std::string> {
277 {"id", callbackIdToString(deviceId, vid)},
278 {"status", std::to_string(static_cast<int>(ConnectionStatus::Connecting))},
279 {"device", deviceId.toString()},
280 {"peer", cert ? cert->issuer->getId().toString() : ""}
281 });
282 }
283 for (const auto& [vid, ci] : waiting) {
284 ret.emplace_back(std::map<std::string, std::string> {
285 {"id", callbackIdToString(deviceId, vid)},
286 {"status", std::to_string(static_cast<int>(ConnectionStatus::Waiting))},
287 {"device", deviceId.toString()},
288 {"peer", cert ? cert->issuer->getId().toString() : ""}
289 });
290 }
291 return ret;
292 }
293};
294
295class DeviceInfoSet {
296public:
297 std::shared_ptr<DeviceInfo> getDeviceInfo(const DeviceId& deviceId) {
298 std::lock_guard<std::mutex> lk(mtx_);
299 auto it = infos_.find(deviceId);
300 if (it != infos_.end())
301 return it->second;
302 return {};
303 }
304
305 std::vector<std::shared_ptr<DeviceInfo>> getDeviceInfos() {
306 std::vector<std::shared_ptr<DeviceInfo>> deviceInfos;
307 std::lock_guard<std::mutex> lk(mtx_);
308 deviceInfos.reserve(infos_.size());
309 for (auto& [deviceId, info] : infos_)
310 deviceInfos.emplace_back(info);
311 return deviceInfos;
312 }
313
314 std::shared_ptr<DeviceInfo> createDeviceInfo(const DeviceId& deviceId) {
315 std::lock_guard<std::mutex> lk(mtx_);
316 auto& info = infos_[deviceId];
317 if (!info)
318 info = std::make_shared<DeviceInfo>(deviceId);
319 return info;
320 }
321
322 bool removeDeviceInfo(const DeviceId& deviceId) {
323 std::lock_guard<std::mutex> lk(mtx_);
324 return infos_.erase(deviceId) != 0;
325 }
326
327 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) {
328 if (auto info = getDeviceInfo(deviceId)) {
329 std::lock_guard<std::mutex> lk(info->mtx_);
330 auto it = info->info.find(id);
331 if (it != info->info.end())
332 return it->second;
333 }
334 return {};
335 }
336
337 std::vector<std::shared_ptr<ConnectionInfo>> getConnectedInfos() {
338 auto deviceInfos = getDeviceInfos();
339 std::vector<std::shared_ptr<ConnectionInfo>> ret;
340 ret.reserve(deviceInfos.size());
341 for (auto& info : deviceInfos) {
342 std::lock_guard<std::mutex> lk(info->mtx_);
343 for (auto& [id, ci] : info->info) {
344 if (ci->socket_)
345 ret.emplace_back(ci);
346 }
347 }
348 return ret;
349 }
350 std::vector<std::shared_ptr<DeviceInfo>> shutdown() {
351 std::vector<std::shared_ptr<DeviceInfo>> ret;
352 std::lock_guard<std::mutex> lk(mtx_);
353 ret.reserve(infos_.size());
354 for (auto& [deviceId, info] : infos_) {
355 ret.emplace_back(std::move(info));
356 }
357 infos_.clear();
358 return ret;
359 }
360
361private:
362 std::mutex mtx_ {};
363 std::map<DeviceId, std::shared_ptr<DeviceInfo>> infos_ {};
364};
365
366
Adrien Béraud612b55b2023-05-29 10:42:04 -0400367/**
368 * returns whether or not UPnP is enabled and active_
369 * ie: if it is able to make port mappings
370 */
371bool
372ConnectionManager::Config::getUPnPActive() const
373{
374 if (upnpCtrl)
375 return upnpCtrl->isReady();
376 return false;
377}
378
379class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
380{
381public:
382 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
Amna81221ad2023-09-14 17:33:26 -0400383 : config_ {std::move(createConfig(config_))}
Adrien Béraudd8b6a402023-12-08 14:19:25 -0500384 , rand_ {config_->rng ? *config_->rng : dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400385 {
Kateryna Kostiukbb300a12023-10-02 13:19:50 -0400386 loadTreatedMessages();
Amna81221ad2023-09-14 17:33:26 -0400387 if(!config_->ioContext) {
388 config_->ioContext = std::make_shared<asio::io_context>();
389 ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() {
390 try {
391 auto work = asio::make_work_guard(*context);
392 context->run();
393 } catch (const std::exception& ex) {
394 if (l) l->error("Exception: {}", ex.what());
395 }
396 });
397 }
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400398 }
Amna81221ad2023-09-14 17:33:26 -0400399 ~Impl() {
400 if (ioContextRunner_) {
401 if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread");
402 config_->ioContext->stop();
403 ioContextRunner_->join();
404 ioContextRunner_.reset();
405 }
406 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400407
408 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
409 const dht::crypto::Identity& identity() const { return config_->id; }
410
Adrien Béraud75754b22023-10-17 09:16:06 -0400411 void shutdown()
Adrien Béraud612b55b2023-05-29 10:42:04 -0400412 {
Adrien Béraud75754b22023-10-17 09:16:06 -0400413 if (isDestroying_.exchange(true))
414 return;
415 std::vector<std::shared_ptr<ConnectionInfo>> unused;
416 std::vector<std::pair<DeviceId, std::vector<PendingCb>>> pending;
417 for (auto& dinfo: infos_.shutdown()) {
418 std::lock_guard<std::mutex> lk(dinfo->mtx_);
419 auto p = dinfo->extractPendingOperations(0, nullptr, false);
420 if (!p.empty())
421 pending.emplace_back(dinfo->deviceId, std::move(p));
422 auto uc = dinfo->extractUnusedConnections();
423 unused.insert(unused.end(), std::make_move_iterator(uc.begin()), std::make_move_iterator(uc.end()));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400424 }
Adrien Béraud75754b22023-10-17 09:16:06 -0400425 for (auto& info: unused)
426 info->shutdown();
427 for (auto& op: pending)
428 for (auto& cb: op.second)
429 cb.cb(nullptr, op.first);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400430 if (!unused.empty())
Amna81221ad2023-09-14 17:33:26 -0400431 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable {
432 infos.clear();
433 });
Adrien Béraud612b55b2023-05-29 10:42:04 -0400434 }
435
Adrien Béraud75754b22023-10-17 09:16:06 -0400436 void connectDeviceStartIce(const std::shared_ptr<ConnectionInfo>& info,
437 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400438 const dht::Value::Id& vid,
439 const std::string& connType,
440 std::function<void(bool)> onConnected);
Adrien Béraud75754b22023-10-17 09:16:06 -0400441 void onResponse(const asio::error_code& ec, const std::weak_ptr<ConnectionInfo>& info, const DeviceId& deviceId, const dht::Value::Id& vid);
442 bool connectDeviceOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo,
443 const std::shared_ptr<ConnectionInfo>& info,
444 const DeviceId& deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400445 const std::string& name,
446 const dht::Value::Id& vid,
447 const std::shared_ptr<dht::crypto::Certificate>& cert);
448 void connectDevice(const DeviceId& deviceId,
449 const std::string& uri,
450 ConnectCallback cb,
451 bool noNewSocket = false,
452 bool forceNewSocket = false,
453 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400454 void connectDevice(const dht::InfoHash& deviceId,
455 const std::string& uri,
456 ConnectCallbackLegacy cb,
457 bool noNewSocket = false,
458 bool forceNewSocket = false,
459 const std::string& connType = "");
460
Adrien Béraud612b55b2023-05-29 10:42:04 -0400461 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
462 const std::string& name,
463 ConnectCallback cb,
464 bool noNewSocket = false,
465 bool forceNewSocket = false,
466 const std::string& connType = "");
467 /**
468 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
469 * @param sock socket used to send the request
470 * @param name channel's name
471 * @param vid channel's id
472 * @param deviceId to identify the linked ConnectCallback
473 */
Adrien Béraud75754b22023-10-17 09:16:06 -0400474 void sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfo,
Adrien Bérauda9ef2a52023-11-05 00:47:24 -0400475 const std::weak_ptr<ConnectionInfo>& cinfo,
Adrien Béraud75754b22023-10-17 09:16:06 -0400476 const std::shared_ptr<MultiplexedSocket>& sock,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400477 const std::string& name,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400478 const dht::Value::Id& vid);
479 /**
480 * Triggered when a PeerConnectionRequest comes from the DHT
481 */
482 void answerTo(IceTransport& ice,
483 const dht::Value::Id& id,
484 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
Adrien Béraud75754b22023-10-17 09:16:06 -0400485 bool onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req);
486 bool onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400487 void onDhtPeerRequest(const PeerConnectionRequest& req,
488 const std::shared_ptr<dht::crypto::Certificate>& cert);
Adrien Béraud75754b22023-10-17 09:16:06 -0400489 /**
490 * Triggered when a new TLS socket is ready to use
491 * @param ok If succeed
492 * @param deviceId Related device
493 * @param vid vid of the connection request
494 * @param name non empty if TLS was created by connectDevice()
495 */
496 void onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo,
497 const std::shared_ptr<ConnectionInfo>& info,
498 bool ok,
499 const DeviceId& deviceId,
500 const dht::Value::Id& vid,
501 const std::string& name = "");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400502
Adrien Béraud75754b22023-10-17 09:16:06 -0400503 void addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info);
Adrien Béraud1addf952023-09-30 17:38:35 -0400504 void onPeerResponse(PeerConnectionRequest&& req);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400505 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
506
Adrien Béraud75754b22023-10-17 09:16:06 -0400507
Adrien Béraud612b55b2023-05-29 10:42:04 -0400508 const std::shared_future<tls::DhParams> dhParams() const;
509 tls::CertificateStore& certStore() const { return *config_->certStore; }
510
511 mutable std::mutex messageMutex_ {};
512 std::set<std::string, std::less<>> treatedMessages_ {};
513
514 void loadTreatedMessages();
515 void saveTreatedMessages() const;
516
517 /// \return true if the given DHT message identifier has been treated
518 /// \note if message has not been treated yet this method st/ore this id and returns true at
519 /// further calls
520 bool isMessageTreated(std::string_view id);
521
522 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
523
524 /**
525 * Published IPv4/IPv6 addresses, used only if defined by the user in account
526 * configuration
527 *
528 */
529 IpAddr publishedIp_[2] {};
530
Adrien Béraud612b55b2023-05-29 10:42:04 -0400531 /**
532 * interface name on which this account is bound
533 */
534 std::string interface_ {"default"};
535
536 /**
537 * Get the local interface name on which this account is bound.
538 */
539 const std::string& getLocalInterface() const { return interface_; }
540
541 /**
542 * Get the published IP address, fallbacks to NAT if family is unspecified
543 * Prefers the usage of IPv4 if possible.
544 */
545 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
546
547 /**
548 * Set published IP address according to given family
549 */
550 void setPublishedAddress(const IpAddr& ip_addr);
551
552 /**
553 * Store the local/public addresses used to register
554 */
555 void storeActiveIpAddress(std::function<void()>&& cb = {});
556
557 /**
558 * Create and return ICE options.
559 */
560 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
561 IceTransportOptions getIceOptions() const noexcept;
562
563 /**
564 * Inform that a potential peer device have been found.
565 * Returns true only if the device certificate is a valid device certificate.
566 * In that case (true is returned) the account_id parameter is set to the peer account ID.
567 */
568 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
569 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
570
571 bool findCertificate(const dht::PkId& id,
572 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400573 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400574
575 /**
576 * returns whether or not UPnP is enabled and active
577 * ie: if it is able to make port mappings
578 */
579 bool getUPnPActive() const;
580
Adrien Béraud612b55b2023-05-29 10:42:04 -0400581 std::shared_ptr<ConnectionManager::Config> config_;
Amna81221ad2023-09-14 17:33:26 -0400582 std::unique_ptr<std::thread> ioContextRunner_;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400583
Adrien Béraud75754b22023-10-17 09:16:06 -0400584 mutable std::mutex randMtx_;
585 mutable std::mt19937_64 rand_;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400586
587 iOSConnectedCallback iOSConnectedCb_ {};
588
Adrien Béraud75754b22023-10-17 09:16:06 -0400589 DeviceInfoSet infos_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400590
591 ChannelRequestCallback channelReqCb_ {};
592 ConnectionReadyCallback connReadyCb_ {};
593 onICERequestCallback iceReqCb_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400594 std::atomic_bool isDestroying_ {false};
595};
596
597void
598ConnectionManager::Impl::connectDeviceStartIce(
Adrien Béraud75754b22023-10-17 09:16:06 -0400599 const std::shared_ptr<ConnectionInfo>& info,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400600 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
601 const dht::Value::Id& vid,
602 const std::string& connType,
603 std::function<void(bool)> onConnected)
604{
605 auto deviceId = devicePk->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400606 if (!info) {
607 onConnected(false);
608 return;
609 }
610
611 std::unique_lock<std::mutex> lk(info->mutex_);
612 auto& ice = info->ice_;
613
614 if (!ice) {
615 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400616 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400617 onConnected(false);
618 return;
619 }
620
621 auto iceAttributes = ice->getLocalAttributes();
622 std::ostringstream icemsg;
623 icemsg << iceAttributes.ufrag << "\n";
624 icemsg << iceAttributes.pwd << "\n";
625 for (const auto& addr : ice->getLocalCandidates(1)) {
626 icemsg << addr << "\n";
627 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400628 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400629 }
630
631 // Prepare connection request as a DHT message
632 PeerConnectionRequest val;
633
634 val.id = vid; /* Random id for the message unicity */
635 val.ice_msg = icemsg.str();
636 val.connType = connType;
637
638 auto value = std::make_shared<dht::Value>(std::move(val));
639 value->user_type = "peer_request";
640
641 // Send connection request through DHT
642 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400643 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400644 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
645 + devicePk->getId().toString()),
646 devicePk,
647 value,
648 [l=config_->logger,deviceId](bool ok) {
649 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400650 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400651 deviceId,
652 (ok ? "ok" : "failed"));
653 });
654 // Wait for call to onResponse() operated by DHT
655 if (isDestroying_) {
656 onConnected(true); // This avoid to wait new negotiation when destroying
657 return;
658 }
659
660 info->onConnected_ = std::move(onConnected);
661 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
662 std::chrono::steady_clock::now()
663 + DHT_MSG_TIMEOUT);
664 info->waitForAnswer_->async_wait(
Adrien Béraud75754b22023-10-17 09:16:06 -0400665 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, info, deviceId, vid));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400666}
667
668void
669ConnectionManager::Impl::onResponse(const asio::error_code& ec,
Adrien Béraud75754b22023-10-17 09:16:06 -0400670 const std::weak_ptr<ConnectionInfo>& winfo,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400671 const DeviceId& deviceId,
672 const dht::Value::Id& vid)
673{
674 if (ec == asio::error::operation_aborted)
675 return;
Adrien Béraud75754b22023-10-17 09:16:06 -0400676 auto info = winfo.lock();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400677 if (!info)
678 return;
679
680 std::unique_lock<std::mutex> lk(info->mutex_);
681 auto& ice = info->ice_;
682 if (isDestroying_) {
683 info->onConnected_(true); // The destructor can wake a pending wait here.
684 return;
685 }
686 if (!info->responseReceived_) {
687 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400688 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400689 info->onConnected_(false);
690 return;
691 }
692
693 if (!info->ice_) {
694 info->onConnected_(false);
695 return;
696 }
697
698 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
699
700 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
701 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400702 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400703 info->onConnected_(false);
704 return;
705 }
706 info->onConnected_(true);
707}
708
709bool
710ConnectionManager::Impl::connectDeviceOnNegoDone(
Adrien Béraud75754b22023-10-17 09:16:06 -0400711 const std::weak_ptr<DeviceInfo>& dinfo,
712 const std::shared_ptr<ConnectionInfo>& info,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400713 const DeviceId& deviceId,
714 const std::string& name,
715 const dht::Value::Id& vid,
716 const std::shared_ptr<dht::crypto::Certificate>& cert)
717{
Adrien Béraud612b55b2023-05-29 10:42:04 -0400718 if (!info)
719 return false;
720
721 std::unique_lock<std::mutex> lk {info->mutex_};
722 if (info->waitForAnswer_) {
723 // Negotiation is done and connected, go to handshake
724 // and avoid any cancellation at this point.
725 info->waitForAnswer_->cancel();
726 }
727 auto& ice = info->ice_;
728 if (!ice || !ice->isRunning()) {
729 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400730 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400731 return false;
732 }
733
734 // Build socket
735 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
736 std::move(ice)),
737 true);
738
739 // Negotiate a TLS session
740 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400741 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400742 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
743 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400744 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400745 identity(),
746 dhParams(),
747 *cert);
748
749 info->tls_->setOnReady(
Adrien Béraud75754b22023-10-17 09:16:06 -0400750 [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
Adrien Béraud612b55b2023-05-29 10:42:04 -0400751 bool ok) {
Andreas Traczykb23278d2023-12-11 16:14:00 -0500752 if (auto shared = w.lock()) {
753 if (auto info = winfo.lock()) {
754 shared->onTlsNegotiationDone(dinfo.lock(), info, ok, deviceId, vid, name);
755 // Make sure there's another reference to info to avoid destruction when
756 // we leave this scope (would lead to a deadlock on cbMtx_).
757 dht::ThreadPool::io().run([info = std::move(info)] {});
758 }
759 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400760 });
761 return true;
762}
763
764void
765ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
766 const std::string& name,
767 ConnectCallback cb,
768 bool noNewSocket,
769 bool forceNewSocket,
770 const std::string& connType)
771{
772 if (!dht()) {
773 cb(nullptr, deviceId);
774 return;
775 }
776 if (deviceId.toString() == identity().second->getLongId().toString()) {
777 cb(nullptr, deviceId);
778 return;
779 }
780 findCertificate(deviceId,
Adrien Béraud75754b22023-10-17 09:16:06 -0400781 [w = weak_from_this(),
Adrien Béraud612b55b2023-05-29 10:42:04 -0400782 deviceId,
783 name,
784 cb = std::move(cb),
785 noNewSocket,
786 forceNewSocket,
787 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
788 if (!cert) {
789 if (auto shared = w.lock())
790 if (shared->config_->logger)
791 shared->config_->logger->error(
792 "No valid certificate found for device {}",
793 deviceId);
794 cb(nullptr, deviceId);
795 return;
796 }
797 if (auto shared = w.lock()) {
798 shared->connectDevice(cert,
799 name,
800 std::move(cb),
801 noNewSocket,
802 forceNewSocket,
803 connType);
804 } else
805 cb(nullptr, deviceId);
806 });
807}
808
809void
Amna0cf544d2023-07-25 14:25:09 -0400810ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
811 const std::string& name,
812 ConnectCallbackLegacy cb,
813 bool noNewSocket,
814 bool forceNewSocket,
815 const std::string& connType)
816{
817 if (!dht()) {
818 cb(nullptr, deviceId);
819 return;
820 }
821 if (deviceId.toString() == identity().second->getLongId().toString()) {
822 cb(nullptr, deviceId);
823 return;
824 }
825 findCertificate(deviceId,
Adrien Béraud75754b22023-10-17 09:16:06 -0400826 [w = weak_from_this(),
Amna0cf544d2023-07-25 14:25:09 -0400827 deviceId,
828 name,
829 cb = std::move(cb),
830 noNewSocket,
831 forceNewSocket,
832 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
833 if (!cert) {
834 if (auto shared = w.lock())
835 if (shared->config_->logger)
836 shared->config_->logger->error(
837 "No valid certificate found for device {}",
838 deviceId);
839 cb(nullptr, deviceId);
840 return;
841 }
842 if (auto shared = w.lock()) {
843 shared->connectDevice(cert,
844 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400845 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400846 cb(sock, deviceId);
847 },
848 noNewSocket,
849 forceNewSocket,
850 connType);
851 } else
852 cb(nullptr, deviceId);
853 });
854}
855
856void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400857ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
858 const std::string& name,
859 ConnectCallback cb,
860 bool noNewSocket,
861 bool forceNewSocket,
862 const std::string& connType)
863{
864 // Avoid dht operation in a DHT callback to avoid deadlocks
Adrien Béraud75754b22023-10-17 09:16:06 -0400865 dht::ThreadPool::computation().run([w = weak_from_this(),
Adrien Béraud612b55b2023-05-29 10:42:04 -0400866 name = std::move(name),
867 cert = std::move(cert),
868 cb = std::move(cb),
869 noNewSocket,
870 forceNewSocket,
871 connType] {
872 auto devicePk = cert->getSharedPublicKey();
873 auto deviceId = devicePk->getLongId();
874 auto sthis = w.lock();
875 if (!sthis || sthis->isDestroying_) {
876 cb(nullptr, deviceId);
877 return;
878 }
Adrien Béraud75754b22023-10-17 09:16:06 -0400879 auto di = sthis->infos_.createDeviceInfo(deviceId);
880 std::unique_lock<std::mutex> lk(di->mtx_);
881
Adrien Béraud26365c92023-09-23 23:42:43 -0400882 dht::Value::Id vid;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400883 {
Adrien Béraud75754b22023-10-17 09:16:06 -0400884 std::lock_guard<std::mutex> lkr(sthis->randMtx_);
885 vid = di->newId(sthis->rand_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400886 }
887
Adrien Béraud75754b22023-10-17 09:16:06 -0400888 // Check if already connecting
889 auto isConnectingToDevice = di->isConnecting();
890 // Note: we can be in a state where first
891 // socket is negotiated and first channel is pending
892 // so return only after we checked the info
Adrien Béraudb941e922023-10-16 12:56:14 -0400893 auto& diw = (isConnectingToDevice && !forceNewSocket)
894 ? di->waiting[vid]
895 : di->connecting[vid];
896 diw = PendingCb {name, std::move(cb)};
897
Adrien Béraud612b55b2023-05-29 10:42:04 -0400898 // Check if already negotiated
Adrien Béraud75754b22023-10-17 09:16:06 -0400899 if (auto info = di->getConnectedInfo()) {
900 std::unique_lock<std::mutex> lkc(info->mutex_);
901 if (auto sock = info->socket_) {
902 info->cbIds_.emplace(vid);
Adrien Béraudb941e922023-10-16 12:56:14 -0400903 diw.requested = true;
Adrien Béraud75754b22023-10-17 09:16:06 -0400904 lkc.unlock();
905 lk.unlock();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400906 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400907 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Bérauda9ef2a52023-11-05 00:47:24 -0400908 sthis->sendChannelRequest(di, info, sock, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400909 return;
910 }
911 }
912
913 if (isConnectingToDevice && !forceNewSocket) {
914 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400915 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400916 return;
917 }
918 if (noNewSocket) {
919 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud75754b22023-10-17 09:16:06 -0400920 di->executePendingOperations(lk, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400921 return;
922 }
923
924 // Note: used when the ice negotiation fails to erase
925 // all stored structures.
Adrien Béraud75754b22023-10-17 09:16:06 -0400926 auto eraseInfo = [w, diw=std::weak_ptr(di), vid] {
927 if (auto di = diw.lock()) {
928 std::unique_lock<std::mutex> lk(di->mtx_);
929 di->info.erase(vid);
930 auto ops = di->extractPendingOperations(vid, nullptr);
931 if (di->empty()) {
932 if (auto shared = w.lock())
933 shared->infos_.removeDeviceInfo(di->deviceId);
934 }
935 lk.unlock();
936 for (const auto& op: ops)
937 op.cb(nullptr, di->deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400938 }
939 };
940
941 // If no socket exists, we need to initiate an ICE connection.
942 sthis->getIceOptions([w,
943 deviceId = std::move(deviceId),
944 devicePk = std::move(devicePk),
Adrien Béraud75754b22023-10-17 09:16:06 -0400945 diw=std::weak_ptr(di),
Adrien Béraud612b55b2023-05-29 10:42:04 -0400946 name = std::move(name),
947 cert = std::move(cert),
948 vid,
949 connType,
950 eraseInfo](auto&& ice_config) {
951 auto sthis = w.lock();
952 if (!sthis) {
953 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
954 return;
955 }
Adrien Béraud75754b22023-10-17 09:16:06 -0400956 auto info = std::make_shared<ConnectionInfo>();
957 auto winfo = std::weak_ptr(info);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400958 ice_config.tcpEnable = true;
959 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400960 devicePk = std::move(devicePk),
961 name = std::move(name),
962 cert = std::move(cert),
Adrien Béraud75754b22023-10-17 09:16:06 -0400963 diw,
964 winfo = std::weak_ptr(info),
Adrien Béraud612b55b2023-05-29 10:42:04 -0400965 vid,
966 connType,
967 eraseInfo](bool ok) {
968 dht::ThreadPool::io().run([w = std::move(w),
969 devicePk = std::move(devicePk),
Adrien Béraud75754b22023-10-17 09:16:06 -0400970 vid,
971 winfo,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400972 eraseInfo,
973 connType, ok] {
974 auto sthis = w.lock();
975 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400976 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400977 if (!sthis || !ok) {
978 eraseInfo();
979 return;
980 }
Adrien Béraud75754b22023-10-17 09:16:06 -0400981 sthis->connectDeviceStartIce(winfo.lock(), devicePk, vid, connType, [=](bool ok) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400982 if (!ok) {
983 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
984 }
985 });
986 });
987 };
988 ice_config.onNegoDone = [w,
989 deviceId,
990 name,
991 cert = std::move(cert),
Adrien Béraud75754b22023-10-17 09:16:06 -0400992 diw,
993 winfo = std::weak_ptr(info),
Adrien Béraud612b55b2023-05-29 10:42:04 -0400994 vid,
995 eraseInfo](bool ok) {
996 dht::ThreadPool::io().run([w = std::move(w),
997 deviceId = std::move(deviceId),
998 name = std::move(name),
999 cert = std::move(cert),
Adrien Béraud75754b22023-10-17 09:16:06 -04001000 diw = std::move(diw),
1001 winfo = std::move(winfo),
Adrien Béraud612b55b2023-05-29 10:42:04 -04001002 vid = std::move(vid),
1003 eraseInfo = std::move(eraseInfo),
1004 ok] {
1005 auto sthis = w.lock();
1006 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001007 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud75754b22023-10-17 09:16:06 -04001008 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(diw, winfo.lock(), deviceId, name, vid, cert))
Adrien Béraud612b55b2023-05-29 10:42:04 -04001009 eraseInfo();
1010 });
1011 };
1012
Adrien Béraud75754b22023-10-17 09:16:06 -04001013 if (auto di = diw.lock()) {
1014 std::lock_guard<std::mutex> lk(di->mtx_);
1015 di->info[vid] = info;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001016 }
1017 std::unique_lock<std::mutex> lk {info->mutex_};
1018 ice_config.master = false;
1019 ice_config.streamsCount = 1;
1020 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -04001021 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001022 if (!info->ice_) {
1023 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001024 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001025 eraseInfo();
1026 return;
1027 }
1028 // We need to detect any shutdown if the ice session is destroyed before going to the
1029 // TLS session;
1030 info->ice_->setOnShutdown([eraseInfo]() {
1031 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1032 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001033 try {
1034 info->ice_->initIceInstance(ice_config);
1035 } catch (const std::exception& e) {
1036 if (sthis->config_->logger)
1037 sthis->config_->logger->error("{}", e.what());
1038 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1039 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001040 });
1041 });
1042}
1043
1044void
Adrien Bérauda9ef2a52023-11-05 00:47:24 -04001045ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfow,
1046 const std::weak_ptr<ConnectionInfo>& cinfow,
Adrien Béraud75754b22023-10-17 09:16:06 -04001047 const std::shared_ptr<MultiplexedSocket>& sock,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001048 const std::string& name,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001049 const dht::Value::Id& vid)
1050{
1051 auto channelSock = sock->addChannel(name);
Adrien Béraud9a4e98b2023-10-15 12:10:21 -04001052 if (!channelSock) {
1053 if (config_->logger)
1054 config_->logger->error("sendChannelRequest failed - cannot create channel");
Adrien Bérauda9ef2a52023-11-05 00:47:24 -04001055 if (auto info = dinfow.lock())
Adrien Béraud9a4e98b2023-10-15 12:10:21 -04001056 info->executePendingOperations(vid, nullptr);
1057 return;
1058 }
Adrien Bérauda9ef2a52023-11-05 00:47:24 -04001059 channelSock->onShutdown([dinfow, name, vid] {
1060 if (auto info = dinfow.lock())
Adrien Béraud75754b22023-10-17 09:16:06 -04001061 info->executePendingOperations(vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001062 });
1063 channelSock->onReady(
Adrien Bérauda9ef2a52023-11-05 00:47:24 -04001064 [dinfow, cinfow, wSock = std::weak_ptr(channelSock), name, vid](bool accepted) {
1065 if (auto dinfo = dinfow.lock()) {
1066 dinfo->executePendingOperations(vid, accepted ? wSock.lock() : nullptr, accepted);
1067 if (auto cinfo = cinfow.lock()) {
1068 std::lock_guard<std::mutex> lk(cinfo->mutex_);
1069 cinfo->cbIds_.erase(vid);
1070 }
1071 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001072 });
1073
1074 ChannelRequest val;
1075 val.name = channelSock->name();
1076 val.state = ChannelRequestState::REQUEST;
1077 val.channel = channelSock->channel();
1078 msgpack::sbuffer buffer(256);
1079 msgpack::pack(buffer, val);
1080
1081 std::error_code ec;
1082 int res = sock->write(CONTROL_CHANNEL,
1083 reinterpret_cast<const uint8_t*>(buffer.data()),
1084 buffer.size(),
1085 ec);
1086 if (res < 0) {
1087 // TODO check if we should handle errors here
1088 if (config_->logger)
Adrien Béraud75754b22023-10-17 09:16:06 -04001089 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001090 }
1091}
1092
1093void
Adrien Béraud1addf952023-09-30 17:38:35 -04001094ConnectionManager::Impl::onPeerResponse(PeerConnectionRequest&& req)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001095{
1096 auto device = req.owner->getLongId();
Adrien Béraud75754b22023-10-17 09:16:06 -04001097 if (auto info = infos_.getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -04001098 if (config_->logger)
1099 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001100 std::lock_guard<std::mutex> lk {info->mutex_};
1101 info->responseReceived_ = true;
1102 info->response_ = std::move(req);
1103 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
1104 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
1105 this,
1106 std::placeholders::_1,
Adrien Béraud75754b22023-10-17 09:16:06 -04001107 std::weak_ptr(info),
Adrien Béraud612b55b2023-05-29 10:42:04 -04001108 device,
1109 req.id));
1110 } else {
1111 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001112 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001113 }
1114}
1115
1116void
1117ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1118{
1119 if (!dht())
1120 return;
1121 dht()->listen<PeerConnectionRequest>(
1122 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
Adrien Béraud75754b22023-10-17 09:16:06 -04001123 [w = weak_from_this()](PeerConnectionRequest&& req) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001124 auto shared = w.lock();
1125 if (!shared)
1126 return false;
1127 if (shared->isMessageTreated(to_hex_string(req.id))) {
1128 // Message already treated. Just ignore
1129 return true;
1130 }
1131 if (req.isAnswer) {
1132 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001133 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001134 } else {
1135 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001136 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001137 }
1138 if (req.isAnswer) {
Adrien Béraud1addf952023-09-30 17:38:35 -04001139 shared->onPeerResponse(std::move(req));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001140 } else {
1141 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -04001142 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -04001143 req.from,
1144 [w, req = std::move(req)](
1145 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
1146 auto shared = w.lock();
1147 if (!shared)
1148 return;
1149 dht::InfoHash peer_h;
1150 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
1151#if TARGET_OS_IOS
1152 if (shared->iOSConnectedCb_(req.connType, peer_h))
1153 return;
1154#endif
1155 shared->onDhtPeerRequest(req, cert);
1156 } else {
1157 if (shared->config_->logger)
1158 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -04001159 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001160 req.owner->getLongId());
1161 }
1162 });
1163 }
1164
1165 return true;
1166 },
1167 dht::Value::UserTypeFilter("peer_request"));
1168}
1169
1170void
Adrien Béraud75754b22023-10-17 09:16:06 -04001171ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo,
1172 const std::shared_ptr<ConnectionInfo>& info,
1173 bool ok,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001174 const DeviceId& deviceId,
1175 const dht::Value::Id& vid,
1176 const std::string& name)
1177{
1178 if (isDestroying_)
1179 return;
1180 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
1181 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
1182 // asked yet)
1183 auto isDhtRequest = name.empty();
1184 if (!ok) {
1185 if (isDhtRequest) {
1186 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001187 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001188 deviceId,
1189 name,
1190 vid);
1191 if (connReadyCb_)
1192 connReadyCb_(deviceId, "", nullptr);
1193 } else {
1194 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001195 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001196 deviceId,
1197 name,
1198 vid);
Adrien Béraud75754b22023-10-17 09:16:06 -04001199 dinfo->executePendingOperations(vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001200 }
Sébastien Blin3cf0acc2023-10-23 09:45:32 -04001201
1202 std::unique_lock<std::mutex> lk(dinfo->mtx_);
1203 dinfo->info.erase(vid);
1204
1205 if (dinfo->empty()) {
1206 infos_.removeDeviceInfo(dinfo->deviceId);
1207 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001208 } else {
1209 // The socket is ready, store it
1210 if (isDhtRequest) {
1211 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001212 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001213 deviceId,
1214 vid);
1215 } else {
1216 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001217 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001218 deviceId,
1219 name,
1220 vid);
1221 }
1222
Adrien Béraud75754b22023-10-17 09:16:06 -04001223 // Note: do not remove pending there it's done in sendChannelRequest
1224 std::unique_lock<std::mutex> lk2 {dinfo->mtx_};
Adrien Béraudb941e922023-10-16 12:56:14 -04001225 auto pendingIds = dinfo->requestPendingOps();
Adrien Béraud75754b22023-10-17 09:16:06 -04001226 lk2.unlock();
1227 std::unique_lock<std::mutex> lk {info->mutex_};
1228 addNewMultiplexedSocket(dinfo, deviceId, vid, info);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001229 // Finally, open the channel and launch pending callbacks
Adrien Béraud75754b22023-10-17 09:16:06 -04001230 lk.unlock();
1231 for (const auto& [id, name]: pendingIds) {
1232 if (config_->logger)
1233 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
1234 deviceId, name);
Adrien Bérauda9ef2a52023-11-05 00:47:24 -04001235 sendChannelRequest(dinfo, info, info->socket_, name, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001236 }
1237 }
1238}
1239
1240void
1241ConnectionManager::Impl::answerTo(IceTransport& ice,
1242 const dht::Value::Id& id,
1243 const std::shared_ptr<dht::crypto::PublicKey>& from)
1244{
1245 // NOTE: This is a shortest version of a real SDP message to save some bits
1246 auto iceAttributes = ice.getLocalAttributes();
1247 std::ostringstream icemsg;
1248 icemsg << iceAttributes.ufrag << "\n";
1249 icemsg << iceAttributes.pwd << "\n";
1250 for (const auto& addr : ice.getLocalCandidates(1)) {
1251 icemsg << addr << "\n";
1252 }
1253
1254 // Send PeerConnection response
1255 PeerConnectionRequest val;
1256 val.id = id;
1257 val.ice_msg = icemsg.str();
1258 val.isAnswer = true;
1259 auto value = std::make_shared<dht::Value>(std::move(val));
1260 value->user_type = "peer_request";
1261
1262 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001263 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001264 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1265 + from->getId().toString()),
1266 from,
1267 value,
1268 [from,l=config_->logger](bool ok) {
1269 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001270 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001271 from->getLongId(),
1272 (ok ? "ok" : "failed"));
1273 });
1274}
1275
1276bool
Adrien Béraud75754b22023-10-17 09:16:06 -04001277ConnectionManager::Impl::onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001278{
Adrien Béraud612b55b2023-05-29 10:42:04 -04001279 if (!info)
1280 return false;
1281
Adrien Béraud75754b22023-10-17 09:16:06 -04001282 auto deviceId = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -04001283 std::unique_lock<std::mutex> lk {info->mutex_};
1284 auto& ice = info->ice_;
1285 if (!ice) {
1286 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001287 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001288 if (connReadyCb_)
1289 connReadyCb_(deviceId, "", nullptr);
1290 return false;
1291 }
1292
1293 auto sdp = ice->parseIceCandidates(req.ice_msg);
1294 answerTo(*ice, req.id, req.owner);
1295 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1296 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001297 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001298 ice = nullptr;
1299 if (connReadyCb_)
1300 connReadyCb_(deviceId, "", nullptr);
1301 return false;
1302 }
1303 return true;
1304}
1305
1306bool
Adrien Béraud75754b22023-10-17 09:16:06 -04001307ConnectionManager::Impl::onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001308{
Adrien Béraud612b55b2023-05-29 10:42:04 -04001309 if (!info)
1310 return false;
1311
Adrien Béraud75754b22023-10-17 09:16:06 -04001312 auto deviceId = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -04001313 std::unique_lock<std::mutex> lk {info->mutex_};
1314 auto& ice = info->ice_;
1315 if (!ice) {
1316 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001317 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001318 return false;
1319 }
1320
1321 // Build socket
1322 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1323 std::move(ice)),
1324 false);
1325
1326 // init TLS session
1327 auto ph = req.from;
1328 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001329 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1330 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001331 req.id);
1332 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1333 std::move(endpoint),
1334 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001335 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001336 identity(),
1337 dhParams(),
Adrien Béraud75754b22023-10-17 09:16:06 -04001338 [ph, deviceId, w=weak_from_this(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001339 auto shared = w.lock();
1340 if (!shared)
1341 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001342 if (cert.getPublicKey().getId() != ph
1343 || deviceId != cert.getPublicKey().getLongId()) {
1344 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1345 deviceId,
1346 cert.getPublicKey().getLongId());
1347 return false;
1348 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001349 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1350 if (!crt)
1351 return false;
1352 return crt->getPacked() == cert.getPacked();
1353 });
1354
1355 info->tls_->setOnReady(
Adrien Béraud75754b22023-10-17 09:16:06 -04001356 [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001357 if (auto shared = w.lock())
Adrien Béraud75754b22023-10-17 09:16:06 -04001358 shared->onTlsNegotiationDone(dinfo.lock(), winfo.lock(), ok, deviceId, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001359 });
1360 return true;
1361}
1362
1363void
1364ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1365 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1366{
1367 auto deviceId = req.owner->getLongId();
1368 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001369 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001370 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1371 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001372 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001373 return;
1374 }
1375
1376 // Because the connection is accepted, create an ICE socket.
Adrien Béraud75754b22023-10-17 09:16:06 -04001377 getIceOptions([w = weak_from_this(), req, deviceId](auto&& ice_config) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001378 auto shared = w.lock();
1379 if (!shared)
1380 return;
Adrien Béraud75754b22023-10-17 09:16:06 -04001381
1382 auto di = shared->infos_.createDeviceInfo(deviceId);
1383 auto info = std::make_shared<ConnectionInfo>();
1384 auto wdi = std::weak_ptr(di);
1385 auto winfo = std::weak_ptr(info);
1386
Adrien Béraud612b55b2023-05-29 10:42:04 -04001387 // Note: used when the ice negotiation fails to erase
1388 // all stored structures.
Adrien Béraud75754b22023-10-17 09:16:06 -04001389 auto eraseInfo = [w, wdi, id = req.id] {
1390 auto shared = w.lock();
1391 if (auto di = wdi.lock()) {
1392 std::unique_lock<std::mutex> lk(di->mtx_);
1393 di->info.erase(id);
1394 auto ops = di->extractPendingOperations(id, nullptr);
1395 if (di->empty()) {
1396 if (shared)
1397 shared->infos_.removeDeviceInfo(di->deviceId);
1398 }
1399 lk.unlock();
1400 for (const auto& op: ops)
1401 op.cb(nullptr, di->deviceId);
1402 if (shared && shared->connReadyCb_)
1403 shared->connReadyCb_(di->deviceId, "", nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001404 }
1405 };
1406
Adrien Béraud75754b22023-10-17 09:16:06 -04001407 ice_config.master = true;
1408 ice_config.streamsCount = 1;
1409 ice_config.compCountPerStream = 1; // TCP
Adrien Béraud612b55b2023-05-29 10:42:04 -04001410 ice_config.tcpEnable = true;
Adrien Béraud75754b22023-10-17 09:16:06 -04001411 ice_config.onInitDone = [w, winfo, req, eraseInfo](bool ok) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001412 auto shared = w.lock();
1413 if (!shared)
1414 return;
1415 if (!ok) {
1416 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001417 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001418 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1419 return;
1420 }
1421
1422 dht::ThreadPool::io().run(
Adrien Béraud75754b22023-10-17 09:16:06 -04001423 [w = std::move(w), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1424 if (auto shared = w.lock()) {
1425 if (!shared->onRequestStartIce(winfo.lock(), req))
1426 eraseInfo();
1427 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001428 });
1429 };
1430
Adrien Béraud75754b22023-10-17 09:16:06 -04001431 ice_config.onNegoDone = [w, wdi, winfo, req, eraseInfo](bool ok) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001432 auto shared = w.lock();
1433 if (!shared)
1434 return;
1435 if (!ok) {
1436 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001437 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001438 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1439 return;
1440 }
1441
1442 dht::ThreadPool::io().run(
Adrien Béraud75754b22023-10-17 09:16:06 -04001443 [w = std::move(w), wdi = std::move(wdi), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001444 if (auto shared = w.lock())
Adrien Béraud75754b22023-10-17 09:16:06 -04001445 if (!shared->onRequestOnNegoDone(wdi.lock(), winfo.lock(), req))
Adrien Béraud612b55b2023-05-29 10:42:04 -04001446 eraseInfo();
1447 });
1448 };
1449
1450 // Negotiate a new ICE socket
Adrien Béraud612b55b2023-05-29 10:42:04 -04001451 {
Adrien Béraud75754b22023-10-17 09:16:06 -04001452 std::lock_guard<std::mutex> lk(di->mtx_);
1453 di->info[req.id] = info;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001454 }
Adrien Béraud75754b22023-10-17 09:16:06 -04001455
Adrien Béraud612b55b2023-05-29 10:42:04 -04001456 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001457 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001458 std::unique_lock<std::mutex> lk {info->mutex_};
Sébastien Blin34086512023-07-25 09:52:14 -04001459 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001460 if (not info->ice_) {
1461 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001462 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001463 eraseInfo();
1464 return;
1465 }
1466 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1467 info->ice_->setOnShutdown([eraseInfo]() {
1468 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1469 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001470 try {
1471 info->ice_->initIceInstance(ice_config);
1472 } catch (const std::exception& e) {
1473 if (shared->config_->logger)
1474 shared->config_->logger->error("{}", e.what());
1475 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1476 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001477 });
1478}
1479
1480void
Adrien Béraud75754b22023-10-17 09:16:06 -04001481ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001482{
Adrien Béraud75754b22023-10-17 09:16:06 -04001483 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, deviceId, std::move(info->tls_), config_->logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001484 info->socket_->setOnReady(
Adrien Béraud75754b22023-10-17 09:16:06 -04001485 [w = weak_from_this()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001486 if (auto sthis = w.lock())
1487 if (sthis->connReadyCb_)
1488 sthis->connReadyCb_(deviceId, socket->name(), socket);
1489 });
Adrien Béraud75754b22023-10-17 09:16:06 -04001490 info->socket_->setOnRequest([w = weak_from_this()](const std::shared_ptr<dht::crypto::Certificate>& peer,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001491 const uint16_t&,
1492 const std::string& name) {
1493 if (auto sthis = w.lock())
1494 if (sthis->channelReqCb_)
1495 return sthis->channelReqCb_(peer, name);
1496 return false;
1497 });
Adrien Béraud75754b22023-10-17 09:16:06 -04001498 info->socket_->onShutdown([dinfo, wi=std::weak_ptr(info), vid]() {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001499 // Cancel current outgoing connections
Adrien Béraud75754b22023-10-17 09:16:06 -04001500 dht::ThreadPool::io().run([dinfo, wi, vid] {
1501 std::set<dht::Value::Id> ids;
1502 if (auto info = wi.lock()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001503 std::lock_guard<std::mutex> lk(info->mutex_);
1504 if (info->socket_) {
1505 ids = std::move(info->cbIds_);
1506 info->socket_->shutdown();
1507 }
1508 }
Adrien Béraud75754b22023-10-17 09:16:06 -04001509 if (auto deviceInfo = dinfo.lock()) {
1510 std::shared_ptr<ConnectionInfo> info;
1511 std::vector<PendingCb> ops;
1512 std::unique_lock<std::mutex> lk(deviceInfo->mtx_);
1513 auto it = deviceInfo->info.find(vid);
1514 if (it != deviceInfo->info.end()) {
1515 info = std::move(it->second);
1516 deviceInfo->info.erase(it);
1517 }
1518 for (const auto& cbId : ids) {
1519 auto po = deviceInfo->extractPendingOperations(cbId, nullptr);
1520 ops.insert(ops.end(), po.begin(), po.end());
1521 }
1522 lk.unlock();
1523 for (auto& op : ops)
1524 op.cb(nullptr, deviceInfo->deviceId);
1525 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001526 });
1527 });
1528}
1529
1530const std::shared_future<tls::DhParams>
1531ConnectionManager::Impl::dhParams() const
1532{
1533 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001534 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001535}
1536
1537template<typename ID = dht::Value::Id>
1538std::set<ID, std::less<>>
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001539loadIdList(const std::filesystem::path& path)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001540{
1541 std::set<ID, std::less<>> ids;
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001542 std::ifstream file(path);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001543 if (!file.is_open()) {
1544 //JAMI_DBG("Could not load %s", path.c_str());
1545 return ids;
1546 }
1547 std::string line;
1548 while (std::getline(file, line)) {
1549 if constexpr (std::is_same<ID, std::string>::value) {
1550 ids.emplace(std::move(line));
1551 } else if constexpr (std::is_integral<ID>::value) {
1552 ID vid;
1553 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1554 ec == std::errc()) {
1555 ids.emplace(vid);
1556 }
1557 }
1558 }
1559 return ids;
1560}
1561
1562template<typename List = std::set<dht::Value::Id>>
1563void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001564saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001565{
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001566 std::ofstream file(path, std::ios::trunc | std::ios::binary);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001567 if (!file.is_open()) {
1568 //JAMI_ERR("Could not save to %s", path.c_str());
1569 return;
1570 }
1571 for (auto& c : ids)
1572 file << std::hex << c << "\n";
1573}
1574
1575void
1576ConnectionManager::Impl::loadTreatedMessages()
1577{
1578 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001579 auto path = config_->cachePath / "treatedMessages";
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001580 treatedMessages_ = loadIdList<std::string>(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001581 if (treatedMessages_.empty()) {
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001582 auto messages = loadIdList(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001583 for (const auto& m : messages)
1584 treatedMessages_.emplace(to_hex_string(m));
1585 }
1586}
1587
1588void
1589ConnectionManager::Impl::saveTreatedMessages() const
1590{
Adrien Béraud75754b22023-10-17 09:16:06 -04001591 dht::ThreadPool::io().run([w = weak_from_this()]() {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001592 if (auto sthis = w.lock()) {
1593 auto& this_ = *sthis;
1594 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1595 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001596 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001597 this_.treatedMessages_);
1598 }
1599 });
1600}
1601
1602bool
1603ConnectionManager::Impl::isMessageTreated(std::string_view id)
1604{
1605 std::lock_guard<std::mutex> lock(messageMutex_);
1606 auto res = treatedMessages_.emplace(id);
1607 if (res.second) {
1608 saveTreatedMessages();
1609 return false;
1610 }
1611 return true;
1612}
1613
1614/**
1615 * returns whether or not UPnP is enabled and active_
1616 * ie: if it is able to make port mappings
1617 */
1618bool
1619ConnectionManager::Impl::getUPnPActive() const
1620{
1621 return config_->getUPnPActive();
1622}
1623
1624IpAddr
1625ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1626{
1627 if (family == AF_INET)
1628 return publishedIp_[0];
1629 if (family == AF_INET6)
1630 return publishedIp_[1];
1631
1632 assert(family == AF_UNSPEC);
1633
1634 // If family is not set, prefere IPv4 if available. It's more
1635 // likely to succeed behind NAT.
1636 if (publishedIp_[0])
1637 return publishedIp_[0];
1638 if (publishedIp_[1])
1639 return publishedIp_[1];
1640 return {};
1641}
1642
1643void
1644ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1645{
1646 if (ip_addr.getFamily() == AF_INET) {
1647 publishedIp_[0] = ip_addr;
1648 } else {
1649 publishedIp_[1] = ip_addr;
1650 }
1651}
1652
1653void
1654ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1655{
Adrien Béraud75754b22023-10-17 09:16:06 -04001656 dht()->getPublicAddress([w=weak_from_this(), cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
Sébastien Blinb6504372023-10-12 10:35:35 -04001657 auto shared = w.lock();
1658 if (!shared)
1659 return;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001660 bool hasIpv4 {false}, hasIpv6 {false};
1661 for (auto& result : results) {
1662 auto family = result.getFamily();
1663 if (family == AF_INET) {
1664 if (not hasIpv4) {
1665 hasIpv4 = true;
Sébastien Blinb6504372023-10-12 10:35:35 -04001666 if (shared->config_->logger)
1667 shared->config_->logger->debug("Store DHT public IPv4 address: {}", result);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001668 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
Sébastien Blinb6504372023-10-12 10:35:35 -04001669 shared->setPublishedAddress(*result.get());
1670 if (shared->config_->upnpCtrl) {
1671 shared->config_->upnpCtrl->setPublicAddress(*result.get());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001672 }
1673 }
1674 } else if (family == AF_INET6) {
1675 if (not hasIpv6) {
1676 hasIpv6 = true;
Sébastien Blinb6504372023-10-12 10:35:35 -04001677 if (shared->config_->logger)
1678 shared->config_->logger->debug("Store DHT public IPv6 address: {}", result);
1679 shared->setPublishedAddress(*result.get());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001680 }
1681 }
1682 if (hasIpv4 and hasIpv6)
1683 break;
1684 }
1685 if (cb)
1686 cb();
1687 });
1688}
1689
1690void
1691ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1692{
1693 storeActiveIpAddress([this, cb = std::move(cb)] {
1694 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1695 auto publishedAddr = getPublishedIpAddress();
1696
1697 if (publishedAddr) {
1698 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1699 publishedAddr.getFamily());
1700 if (interfaceAddr) {
1701 opts.accountLocalAddr = interfaceAddr;
1702 opts.accountPublicAddr = publishedAddr;
1703 }
1704 }
1705 if (cb)
1706 cb(std::move(opts));
1707 });
1708}
1709
1710IceTransportOptions
1711ConnectionManager::Impl::getIceOptions() const noexcept
1712{
1713 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001714 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001715 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001716 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001717
1718 if (config_->stunEnabled)
1719 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1720 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001721 if (config_->turnCache) {
1722 auto turnAddr = config_->turnCache->getResolvedTurn();
1723 if (turnAddr != std::nullopt) {
1724 opts.turnServers.emplace_back(TurnServerInfo()
1725 .setUri(turnAddr->toString())
1726 .setUsername(config_->turnServerUserName)
1727 .setPassword(config_->turnServerPwd)
1728 .setRealm(config_->turnServerRealm));
1729 }
1730 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001731 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001732 .setUri(config_->turnServer)
1733 .setUsername(config_->turnServerUserName)
1734 .setPassword(config_->turnServerPwd)
1735 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001736 }
1737 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1738 // co issues. So this needs some debug. for now just disable
1739 // if (cacheTurnV6 && *cacheTurnV6) {
1740 // opts.turnServers.emplace_back(TurnServerInfo()
1741 // .setUri(cacheTurnV6->toString(true))
1742 // .setUsername(turnServerUserName_)
1743 // .setPassword(turnServerPwd_)
1744 // .setRealm(turnServerRealm_));
1745 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001746 }
1747 return opts;
1748}
1749
1750bool
1751ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1752 dht::InfoHash& account_id,
1753 const std::shared_ptr<Logger>& logger)
1754{
1755 if (not crt)
1756 return false;
1757
1758 auto top_issuer = crt;
1759 while (top_issuer->issuer)
1760 top_issuer = top_issuer->issuer;
1761
1762 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001763 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001764 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001765 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001766 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001767 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001768
1769 // Check peer certificate chain
1770 // Trust store with top issuer as the only CA
1771 dht::crypto::TrustList peer_trust;
1772 peer_trust.add(*top_issuer);
1773 if (not peer_trust.verify(*crt)) {
1774 if (logger)
1775 logger->warn("Found invalid peer device: {}", crt->getLongId());
1776 return false;
1777 }
1778
1779 // Check cached OCSP response
1780 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1781 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001782 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001783 return false;
1784 }
1785
Adrien Béraudc631a832023-07-26 22:19:00 -04001786 account_id = crt->issuer->getId();
1787 if (logger)
1788 logger->warn("Found peer device: {} account:{} CA:{}",
1789 crt->getLongId(),
1790 account_id,
1791 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001792 return true;
1793}
1794
1795bool
1796ConnectionManager::Impl::findCertificate(
1797 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1798{
1799 if (auto cert = certStore().getCertificate(id.toString())) {
1800 if (cb)
1801 cb(cert);
1802 } else if (cb)
1803 cb(nullptr);
1804 return true;
1805}
1806
Sébastien Blin34086512023-07-25 09:52:14 -04001807bool
1808ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1809 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1810{
1811 if (auto cert = certStore().getCertificate(h.toString())) {
1812 if (cb)
1813 cb(cert);
1814 } else {
1815 dht()->findCertificate(h,
1816 [cb = std::move(cb), this](
1817 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1818 if (crt)
1819 certStore().pinCertificate(crt);
1820 if (cb)
1821 cb(crt);
1822 });
1823 }
1824 return true;
1825}
1826
Amna81221ad2023-09-14 17:33:26 -04001827std::shared_ptr<ConnectionManager::Config>
1828buildDefaultConfig(dht::crypto::Identity id){
1829 auto conf = std::make_shared<ConnectionManager::Config>();
1830 conf->id = std::move(id);
1831 return conf;
1832}
1833
Adrien Béraud612b55b2023-05-29 10:42:04 -04001834ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1835 : pimpl_ {std::make_shared<Impl>(config_)}
1836{}
1837
Amna81221ad2023-09-14 17:33:26 -04001838ConnectionManager::ConnectionManager(dht::crypto::Identity id)
1839 : ConnectionManager {buildDefaultConfig(id)}
1840{}
1841
Adrien Béraud612b55b2023-05-29 10:42:04 -04001842ConnectionManager::~ConnectionManager()
1843{
1844 if (pimpl_)
1845 pimpl_->shutdown();
1846}
1847
1848void
1849ConnectionManager::connectDevice(const DeviceId& deviceId,
1850 const std::string& name,
1851 ConnectCallback cb,
1852 bool noNewSocket,
1853 bool forceNewSocket,
1854 const std::string& connType)
1855{
1856 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1857}
1858
1859void
Amna0cf544d2023-07-25 14:25:09 -04001860ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1861 const std::string& name,
1862 ConnectCallbackLegacy cb,
1863 bool noNewSocket,
1864 bool forceNewSocket,
1865 const std::string& connType)
1866{
1867 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1868}
1869
1870
1871void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001872ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1873 const std::string& name,
1874 ConnectCallback cb,
1875 bool noNewSocket,
1876 bool forceNewSocket,
1877 const std::string& connType)
1878{
1879 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1880}
1881
1882bool
1883ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1884{
Adrien Béraud75754b22023-10-17 09:16:06 -04001885 if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) {
1886 std::unique_lock<std::mutex> lk {dinfo->mtx_};
Adrien Béraudb941e922023-10-16 12:56:14 -04001887 return dinfo->isConnecting(name);
Adrien Béraud75754b22023-10-17 09:16:06 -04001888 }
1889 return false;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001890}
1891
Sébastien Blind0c92c72023-12-07 15:27:51 -05001892bool
1893ConnectionManager::isConnected(const DeviceId& deviceId) const
1894{
1895 if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) {
1896 std::unique_lock<std::mutex> lk {dinfo->mtx_};
1897 return dinfo->getConnectedInfo() != nullptr;
1898 }
1899 return false;
1900}
1901
Adrien Béraud612b55b2023-05-29 10:42:04 -04001902void
1903ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1904{
Adrien Béraud75754b22023-10-17 09:16:06 -04001905 std::vector<std::shared_ptr<DeviceInfo>> dInfos;
1906 for (const auto& dinfo: pimpl_->infos_.getDeviceInfos()) {
1907 std::unique_lock<std::mutex> lk(dinfo->mtx_);
1908 bool isPeer = false;
1909 for (auto const& [id, cinfo]: dinfo->info) {
1910 std::lock_guard<std::mutex> lkv {cinfo->mutex_};
1911 auto tls = cinfo->tls_ ? cinfo->tls_.get() : (cinfo->socket_ ? cinfo->socket_->endpoint() : nullptr);
Adrien Béraudafa8e282023-09-24 12:53:20 -04001912 auto cert = tls ? tls->peerCertificate() : nullptr;
1913 if (not cert)
Adrien Béraud75754b22023-10-17 09:16:06 -04001914 cert = pimpl_->certStore().getCertificate(dinfo->deviceId.toString());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001915 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001916 isPeer = true;
1917 break;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001918 }
1919 }
Adrien Béraud75754b22023-10-17 09:16:06 -04001920 lk.unlock();
1921 if (isPeer) {
1922 dInfos.emplace_back(std::move(dinfo));
1923 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001924 }
1925 // Stop connections to all peers devices
Adrien Béraud75754b22023-10-17 09:16:06 -04001926 for (const auto& dinfo : dInfos) {
1927 std::unique_lock<std::mutex> lk {dinfo->mtx_};
1928 auto unused = dinfo->extractUnusedConnections();
1929 auto pending = dinfo->extractPendingOperations(0, nullptr);
1930 pimpl_->infos_.removeDeviceInfo(dinfo->deviceId);
1931 lk.unlock();
1932 for (auto& op : unused)
1933 op->shutdown();
1934 for (auto& op : pending)
1935 op.cb(nullptr, dinfo->deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001936 }
1937}
1938
1939void
1940ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1941{
1942 pimpl_->onDhtConnected(devicePk);
1943}
1944
1945void
1946ConnectionManager::onICERequest(onICERequestCallback&& cb)
1947{
1948 pimpl_->iceReqCb_ = std::move(cb);
1949}
1950
1951void
1952ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1953{
1954 pimpl_->channelReqCb_ = std::move(cb);
1955}
1956
1957void
1958ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1959{
1960 pimpl_->connReadyCb_ = std::move(cb);
1961}
1962
1963void
1964ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1965{
1966 pimpl_->iOSConnectedCb_ = std::move(cb);
1967}
1968
1969std::size_t
1970ConnectionManager::activeSockets() const
1971{
Adrien Béraud75754b22023-10-17 09:16:06 -04001972 return pimpl_->infos_.getConnectedInfos().size();
Adrien Béraud612b55b2023-05-29 10:42:04 -04001973}
1974
1975void
1976ConnectionManager::monitor() const
1977{
Adrien Béraud612b55b2023-05-29 10:42:04 -04001978 auto logger = pimpl_->config_->logger;
1979 if (!logger)
1980 return;
1981 logger->debug("ConnectionManager current status:");
Adrien Béraud75754b22023-10-17 09:16:06 -04001982 for (const auto& ci : pimpl_->infos_.getConnectedInfos()) {
1983 std::lock_guard<std::mutex> lk(ci->mutex_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001984 if (ci->socket_)
1985 ci->socket_->monitor();
1986 }
1987 logger->debug("ConnectionManager end status.");
1988}
1989
1990void
1991ConnectionManager::connectivityChanged()
1992{
Adrien Béraud75754b22023-10-17 09:16:06 -04001993 for (const auto& ci : pimpl_->infos_.getConnectedInfos()) {
1994 std::lock_guard<std::mutex> lk(ci->mutex_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001995 if (ci->socket_)
Adrien Béraud51a54712023-10-17 21:24:30 -04001996 dht::ThreadPool::io().run([s = ci->socket_] { s->sendBeacon(); });
Adrien Béraud612b55b2023-05-29 10:42:04 -04001997 }
1998}
1999
2000void
2001ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
2002{
2003 return pimpl_->getIceOptions(std::move(cb));
2004}
2005
2006IceTransportOptions
2007ConnectionManager::getIceOptions() const noexcept
2008{
2009 return pimpl_->getIceOptions();
2010}
2011
2012IpAddr
2013ConnectionManager::getPublishedIpAddress(uint16_t family) const
2014{
2015 return pimpl_->getPublishedIpAddress(family);
2016}
2017
2018void
2019ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
2020{
2021 return pimpl_->setPublishedAddress(ip_addr);
2022}
2023
2024void
2025ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
2026{
2027 return pimpl_->storeActiveIpAddress(std::move(cb));
2028}
2029
2030std::shared_ptr<ConnectionManager::Config>
2031ConnectionManager::getConfig()
2032{
2033 return pimpl_->config_;
2034}
2035
Amna31791e52023-08-03 12:40:57 -04002036std::vector<std::map<std::string, std::string>>
2037ConnectionManager::getConnectionList(const DeviceId& device) const
2038{
2039 std::vector<std::map<std::string, std::string>> connectionsList;
Amna31791e52023-08-03 12:40:57 -04002040 if (device) {
Adrien Béraud75754b22023-10-17 09:16:06 -04002041 if (auto deviceInfo = pimpl_->infos_.getDeviceInfo(device)) {
2042 connectionsList = deviceInfo->getConnectionList(pimpl_->certStore());
Amna31791e52023-08-03 12:40:57 -04002043 }
Adrien Béraud75754b22023-10-17 09:16:06 -04002044 } else {
2045 for (const auto& deviceInfo : pimpl_->infos_.getDeviceInfos()) {
2046 auto cl = deviceInfo->getConnectionList(pimpl_->certStore());
2047 connectionsList.insert(connectionsList.end(), std::make_move_iterator(cl.begin()), std::make_move_iterator(cl.end()));
Amna31791e52023-08-03 12:40:57 -04002048 }
2049 }
2050 return connectionsList;
2051}
2052
2053std::vector<std::map<std::string, std::string>>
2054ConnectionManager::getChannelList(const std::string& connectionId) const
2055{
Adrien Béraud75754b22023-10-17 09:16:06 -04002056 auto [deviceId, valueId] = parseCallbackId(connectionId);
2057 if (auto info = pimpl_->infos_.getInfo(deviceId, valueId)) {
2058 std::lock_guard<std::mutex> lk(info->mutex_);
2059 if (info->socket_)
2060 return info->socket_->getChannelList();
Amna31791e52023-08-03 12:40:57 -04002061 }
Adrien Béraud75754b22023-10-17 09:16:06 -04002062 return {};
Amna31791e52023-08-03 12:40:57 -04002063}
2064
Sébastien Blin464bdff2023-07-19 08:02:53 -04002065} // namespace dhtnet