| /* |
| * Copyright (C) 2004-2023 Savoir-faire Linux Inc. |
| * |
| * This program is free software: you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation, either version 3 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program. If not, see <https://www.gnu.org/licenses/>. |
| */ |
| #include "connectionmanager.h" |
| #include "peer_connection.h" |
| #include "upnp/upnp_control.h" |
| #include "certstore.h" |
| #include "fileutils.h" |
| #include "sip_utils.h" |
| #include "string_utils.h" |
| |
| #include <opendht/crypto.h> |
| #include <opendht/thread_pool.h> |
| #include <opendht/value.h> |
| #include <asio.hpp> |
| |
| #include <algorithm> |
| #include <mutex> |
| #include <map> |
| #include <condition_variable> |
| #include <set> |
| #include <charconv> |
| #include <fstream> |
| |
| namespace dhtnet { |
| static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30}; |
| static constexpr uint64_t ID_MAX_VAL = 9007199254740992; |
| |
| using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>; |
| |
| std::string |
| callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid) |
| { |
| return fmt::format("{} {}", did.to_view(), vid); |
| } |
| |
| std::pair<dhtnet::DeviceId, dht::Value::Id> parseCallbackId(std::string_view ci) |
| { |
| auto sep = ci.find(' '); |
| std::string_view deviceIdString = ci.substr(0, sep); |
| std::string_view vidString = ci.substr(sep + 1); |
| |
| dhtnet::DeviceId deviceId(deviceIdString); |
| dht::Value::Id vid = std::stoull(std::string(vidString), nullptr, 10); |
| return {deviceId, vid}; |
| } |
| |
| std::shared_ptr<ConnectionManager::Config> |
| createConfig(std::shared_ptr<ConnectionManager::Config> config_) |
| { |
| if (!config_->certStore){ |
| config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger); |
| } |
| if (!config_->dht) { |
| dht::DhtRunner::Config dhtConfig; |
| dhtConfig.dht_config.id = config_->id; |
| dhtConfig.threaded = true; |
| dht::DhtRunner::Context dhtContext; |
| dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) { |
| std::vector<std::shared_ptr<dht::crypto::Certificate>> ret; |
| if (auto cert = c->getCertificate(pk_id.toString())) |
| ret.emplace_back(std::move(cert)); |
| return ret; |
| }; |
| config_->dht = std::make_shared<dht::DhtRunner>(); |
| config_->dht->run(dhtConfig, std::move(dhtContext)); |
| config_->dht->bootstrap("bootstrap.jami.net"); |
| } |
| if (!config_->factory){ |
| config_->factory = std::make_shared<IceTransportFactory>(config_->logger); |
| } |
| return config_; |
| } |
| |
| struct ConnectionInfo |
| { |
| ~ConnectionInfo() |
| { |
| if (socket_) |
| socket_->join(); |
| } |
| |
| std::mutex mutex_ {}; |
| bool responseReceived_ {false}; |
| PeerConnectionRequest response_ {}; |
| std::unique_ptr<IceTransport> ice_ {nullptr}; |
| // Used to store currently non ready TLS Socket |
| std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr}; |
| std::shared_ptr<MultiplexedSocket> socket_ {}; |
| std::set<dht::Value::Id> cbIds_ {}; |
| |
| std::function<void(bool)> onConnected_; |
| std::unique_ptr<asio::steady_timer> waitForAnswer_ {}; |
| |
| void shutdown() { |
| std::lock_guard lk(mutex_); |
| if (tls_) |
| tls_->shutdown(); |
| if (socket_) |
| socket_->shutdown(); |
| if (waitForAnswer_) |
| waitForAnswer_->cancel(); |
| if (ice_) { |
| dht::ThreadPool::io().run( |
| [ice = std::shared_ptr<IceTransport>(std::move(ice_))] {}); |
| } |
| } |
| |
| std::map<std::string, std::string> |
| getInfo(const DeviceId& deviceId, dht::Value::Id valueId, tls::CertificateStore& certStore) const |
| { |
| std::map<std::string, std::string> connectionInfo; |
| connectionInfo["id"] = callbackIdToString(deviceId, valueId); |
| connectionInfo["device"] = deviceId.toString(); |
| auto cert = tls_ ? tls_->peerCertificate() : (socket_ ? socket_->peerCertificate() : nullptr); |
| if (not cert) |
| cert = certStore.getCertificate(deviceId.toString()); |
| if (cert) { |
| connectionInfo["peer"] = cert->issuer->getId().toString(); |
| } |
| if (socket_) { |
| connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected)); |
| connectionInfo["remoteAddress"] = socket_->getRemoteAddress(); |
| } else if (tls_) { |
| connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS)); |
| connectionInfo["remoteAddress"] = tls_->getRemoteAddress(); |
| } else if(ice_) { |
| connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE)); |
| connectionInfo["remoteAddress"] = ice_->getRemoteAddress(ICE_COMP_ID_SIP_TRANSPORT); |
| } |
| return connectionInfo; |
| } |
| }; |
| |
| struct PendingCb { |
| std::string name; |
| ConnectCallback cb; |
| bool requested {false}; |
| }; |
| |
| struct DeviceInfo { |
| const DeviceId deviceId; |
| mutable std::mutex mtx_ {}; |
| std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info; |
| std::map<dht::Value::Id, PendingCb> connecting; |
| std::map<dht::Value::Id, PendingCb> waiting; |
| DeviceInfo(DeviceId id) : deviceId {id} {} |
| |
| inline bool isConnecting() const { |
| return !connecting.empty() || !waiting.empty(); |
| } |
| |
| inline bool empty() const { |
| return info.empty() && connecting.empty() && waiting.empty(); |
| } |
| |
| dht::Value::Id newId(std::mt19937_64& rand) const { |
| ValueIdDist dist(1, ID_MAX_VAL); |
| dht::Value::Id id; |
| do { |
| id = dist(rand); |
| } while (info.find(id) != info.end() |
| || connecting.find(id) != connecting.end() |
| || waiting.find(id) != waiting.end()); |
| return id; |
| } |
| |
| std::shared_ptr<ConnectionInfo> getConnectedInfo() const { |
| for (auto& [id, ci] : info) { |
| if (ci->socket_) |
| return ci; |
| } |
| return {}; |
| } |
| |
| std::vector<PendingCb> extractPendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) |
| { |
| std::vector<PendingCb> ret; |
| if (vid == 0) { |
| // Extract all pending callbacks |
| ret.reserve(connecting.size() + waiting.size()); |
| for (auto& [vid, cb] : connecting) |
| ret.emplace_back(std::move(cb)); |
| connecting.clear(); |
| for (auto& [vid, cb] : waiting) |
| ret.emplace_back(std::move(cb)); |
| waiting.clear(); |
| } else if (auto n = waiting.extract(vid)) { |
| // If it's a waiting operation, just move it |
| ret.emplace_back(std::move(n.mapped())); |
| } else if (auto n = connecting.extract(vid)) { |
| ret.emplace_back(std::move(n.mapped())); |
| // If sock is nullptr, execute if it's the last connecting operation |
| // If accepted is false, it means that underlying socket is ok, but channel is declined |
| if (!sock && connecting.empty() && accepted) { |
| for (auto& [vid, cb] : waiting) |
| ret.emplace_back(std::move(cb)); |
| waiting.clear(); |
| for (auto& [vid, cb] : connecting) |
| ret.emplace_back(std::move(cb)); |
| connecting.clear(); |
| } |
| } |
| return ret; |
| } |
| |
| std::vector<std::shared_ptr<ConnectionInfo>> extractUnusedConnections() { |
| std::vector<std::shared_ptr<ConnectionInfo>> unused {}; |
| for (auto& [id, info] : info) |
| unused.emplace_back(std::move(info)); |
| info.clear(); |
| return unused; |
| } |
| |
| void executePendingOperations(std::unique_lock<std::mutex>& lock, dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { |
| auto ops = extractPendingOperations(vid, sock, accepted); |
| lock.unlock(); |
| for (auto& cb : ops) |
| cb.cb(sock, deviceId); |
| } |
| void executePendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { |
| std::unique_lock lock(mtx_); |
| executePendingOperations(lock, vid, sock, accepted); |
| } |
| |
| bool isConnecting(const std::string& name) const { |
| for (const auto& [id, pc]: connecting) |
| if (pc.name == name) |
| return true; |
| for (const auto& [id, pc]: waiting) |
| if (pc.name == name) |
| return true; |
| return false; |
| } |
| std::map<dht::Value::Id, std::string> requestPendingOps() { |
| std::map<dht::Value::Id, std::string> ret; |
| for (auto& [id, pc]: connecting) { |
| if (!pc.requested) { |
| ret[id] = pc.name; |
| pc.requested = true; |
| } |
| } |
| for (auto& [id, pc]: waiting) { |
| if (!pc.requested) { |
| ret[id] = pc.name; |
| pc.requested = true; |
| } |
| } |
| return ret; |
| } |
| |
| std::vector<std::map<std::string, std::string>> |
| getConnectionList(tls::CertificateStore& certStore) const { |
| std::lock_guard lk(mtx_); |
| std::vector<std::map<std::string, std::string>> ret; |
| ret.reserve(info.size() + connecting.size() + waiting.size()); |
| for (auto& [id, ci] : info) { |
| std::lock_guard lk(ci->mutex_); |
| ret.emplace_back(ci->getInfo(deviceId, id, certStore)); |
| } |
| auto cert = certStore.getCertificate(deviceId.toString()); |
| for (const auto& [vid, ci] : connecting) { |
| ret.emplace_back(std::map<std::string, std::string> { |
| {"id", callbackIdToString(deviceId, vid)}, |
| {"status", std::to_string(static_cast<int>(ConnectionStatus::Connecting))}, |
| {"device", deviceId.toString()}, |
| {"peer", cert ? cert->issuer->getId().toString() : ""} |
| }); |
| } |
| for (const auto& [vid, ci] : waiting) { |
| ret.emplace_back(std::map<std::string, std::string> { |
| {"id", callbackIdToString(deviceId, vid)}, |
| {"status", std::to_string(static_cast<int>(ConnectionStatus::Waiting))}, |
| {"device", deviceId.toString()}, |
| {"peer", cert ? cert->issuer->getId().toString() : ""} |
| }); |
| } |
| return ret; |
| } |
| }; |
| |
| class DeviceInfoSet { |
| public: |
| std::shared_ptr<DeviceInfo> getDeviceInfo(const DeviceId& deviceId) { |
| std::lock_guard lk(mtx_); |
| auto it = infos_.find(deviceId); |
| if (it != infos_.end()) |
| return it->second; |
| return {}; |
| } |
| |
| std::vector<std::shared_ptr<DeviceInfo>> getDeviceInfos() { |
| std::vector<std::shared_ptr<DeviceInfo>> deviceInfos; |
| std::lock_guard lk(mtx_); |
| deviceInfos.reserve(infos_.size()); |
| for (auto& [deviceId, info] : infos_) |
| deviceInfos.emplace_back(info); |
| return deviceInfos; |
| } |
| |
| std::shared_ptr<DeviceInfo> createDeviceInfo(const DeviceId& deviceId) { |
| std::lock_guard lk(mtx_); |
| auto& info = infos_[deviceId]; |
| if (!info) |
| info = std::make_shared<DeviceInfo>(deviceId); |
| return info; |
| } |
| |
| bool removeDeviceInfo(const DeviceId& deviceId) { |
| std::lock_guard lk(mtx_); |
| return infos_.erase(deviceId) != 0; |
| } |
| |
| std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) { |
| if (auto info = getDeviceInfo(deviceId)) { |
| std::lock_guard lk(info->mtx_); |
| auto it = info->info.find(id); |
| if (it != info->info.end()) |
| return it->second; |
| } |
| return {}; |
| } |
| |
| std::vector<std::shared_ptr<ConnectionInfo>> getConnectedInfos() { |
| auto deviceInfos = getDeviceInfos(); |
| std::vector<std::shared_ptr<ConnectionInfo>> ret; |
| ret.reserve(deviceInfos.size()); |
| for (auto& info : deviceInfos) { |
| std::lock_guard lk(info->mtx_); |
| for (auto& [id, ci] : info->info) { |
| if (ci->socket_) |
| ret.emplace_back(ci); |
| } |
| } |
| return ret; |
| } |
| std::vector<std::shared_ptr<DeviceInfo>> shutdown() { |
| std::vector<std::shared_ptr<DeviceInfo>> ret; |
| std::lock_guard lk(mtx_); |
| ret.reserve(infos_.size()); |
| for (auto& [deviceId, info] : infos_) { |
| ret.emplace_back(std::move(info)); |
| } |
| infos_.clear(); |
| return ret; |
| } |
| |
| private: |
| std::mutex mtx_ {}; |
| std::map<DeviceId, std::shared_ptr<DeviceInfo>> infos_ {}; |
| }; |
| |
| class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl> |
| { |
| public: |
| explicit Impl(std::shared_ptr<ConnectionManager::Config> config_) |
| : config_ {std::move(createConfig(config_))} |
| , rand_ {config_->rng ? *config_->rng : dht::crypto::getSeededRandomEngine<std::mt19937_64>()} |
| , treatedMessages_ {config_->cachePath / "treatedMessages"} |
| { |
| if(!config_->ioContext) { |
| config_->ioContext = std::make_shared<asio::io_context>(); |
| ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() { |
| try { |
| auto work = asio::make_work_guard(*context); |
| context->run(); |
| } catch (const std::exception& ex) { |
| if (l) l->error("Exception: {}", ex.what()); |
| } |
| }); |
| } |
| } |
| ~Impl() { |
| if (ioContextRunner_) { |
| if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread"); |
| config_->ioContext->stop(); |
| ioContextRunner_->join(); |
| ioContextRunner_.reset(); |
| } |
| } |
| |
| std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; } |
| const dht::crypto::Identity& identity() const { return config_->id; } |
| |
| void shutdown() |
| { |
| if (isDestroying_.exchange(true)) |
| return; |
| std::vector<std::shared_ptr<ConnectionInfo>> unused; |
| std::vector<std::pair<DeviceId, std::vector<PendingCb>>> pending; |
| for (auto& dinfo: infos_.shutdown()) { |
| std::lock_guard lk(dinfo->mtx_); |
| auto p = dinfo->extractPendingOperations(0, nullptr, false); |
| if (!p.empty()) |
| pending.emplace_back(dinfo->deviceId, std::move(p)); |
| auto uc = dinfo->extractUnusedConnections(); |
| unused.insert(unused.end(), std::make_move_iterator(uc.begin()), std::make_move_iterator(uc.end())); |
| } |
| for (auto& info: unused) |
| info->shutdown(); |
| for (auto& op: pending) |
| for (auto& cb: op.second) |
| cb.cb(nullptr, op.first); |
| if (!unused.empty()) |
| dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { |
| infos.clear(); |
| }); |
| } |
| |
| void connectDeviceStartIce(const std::shared_ptr<ConnectionInfo>& info, |
| const std::shared_ptr<dht::crypto::PublicKey>& devicePk, |
| const dht::Value::Id& vid, |
| const std::string& connType, |
| std::function<void(bool)> onConnected); |
| void onResponse(const asio::error_code& ec, const std::weak_ptr<ConnectionInfo>& info, const DeviceId& deviceId, const dht::Value::Id& vid); |
| bool connectDeviceOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, |
| const std::shared_ptr<ConnectionInfo>& info, |
| const DeviceId& deviceId, |
| const std::string& name, |
| const dht::Value::Id& vid, |
| const std::shared_ptr<dht::crypto::Certificate>& cert); |
| void connectDevice(const DeviceId& deviceId, |
| const std::string& uri, |
| ConnectCallback cb, |
| bool noNewSocket = false, |
| bool forceNewSocket = false, |
| const std::string& connType = ""); |
| void connectDevice(const dht::InfoHash& deviceId, |
| const std::string& uri, |
| ConnectCallbackLegacy cb, |
| bool noNewSocket = false, |
| bool forceNewSocket = false, |
| const std::string& connType = ""); |
| |
| void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket = false, |
| bool forceNewSocket = false, |
| const std::string& connType = ""); |
| /** |
| * Send a ChannelRequest on the TLS socket. Triggers cb when ready |
| * @param sock socket used to send the request |
| * @param name channel's name |
| * @param vid channel's id |
| * @param deviceId to identify the linked ConnectCallback |
| */ |
| void sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfo, |
| const std::weak_ptr<ConnectionInfo>& cinfo, |
| const std::shared_ptr<MultiplexedSocket>& sock, |
| const std::string& name, |
| const dht::Value::Id& vid); |
| /** |
| * Triggered when a PeerConnectionRequest comes from the DHT |
| */ |
| void answerTo(IceTransport& ice, |
| const dht::Value::Id& id, |
| const std::shared_ptr<dht::crypto::PublicKey>& fromPk); |
| bool onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req); |
| bool onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req); |
| void onDhtPeerRequest(const PeerConnectionRequest& req, |
| const std::shared_ptr<dht::crypto::Certificate>& cert); |
| /** |
| * Triggered when a new TLS socket is ready to use |
| * @param ok If succeed |
| * @param deviceId Related device |
| * @param vid vid of the connection request |
| * @param name non empty if TLS was created by connectDevice() |
| */ |
| void onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo, |
| const std::shared_ptr<ConnectionInfo>& info, |
| bool ok, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid, |
| const std::string& name = ""); |
| |
| void addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info); |
| void onPeerResponse(PeerConnectionRequest&& req); |
| void onDhtConnected(const dht::crypto::PublicKey& devicePk); |
| |
| |
| const std::shared_future<tls::DhParams> dhParams() const; |
| tls::CertificateStore& certStore() const { return *config_->certStore; } |
| |
| mutable std::mutex messageMutex_ {}; |
| fileutils::IdList treatedMessages_; |
| |
| /// \return true if the given DHT message identifier has been treated |
| /// \note if message has not been treated yet this method st/ore this id and returns true at |
| /// further calls |
| bool isMessageTreated(dht::Value::Id id); |
| |
| const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; } |
| |
| /** |
| * Published IPv4/IPv6 addresses, used only if defined by the user in account |
| * configuration |
| * |
| */ |
| IpAddr publishedIp_[2] {}; |
| |
| /** |
| * interface name on which this account is bound |
| */ |
| std::string interface_ {"default"}; |
| |
| /** |
| * Get the local interface name on which this account is bound. |
| */ |
| const std::string& getLocalInterface() const { return interface_; } |
| |
| /** |
| * Get the published IP address, fallbacks to NAT if family is unspecified |
| * Prefers the usage of IPv4 if possible. |
| */ |
| IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const; |
| |
| /** |
| * Set published IP address according to given family |
| */ |
| void setPublishedAddress(const IpAddr& ip_addr); |
| |
| /** |
| * Store the local/public addresses used to register |
| */ |
| void storeActiveIpAddress(std::function<void()>&& cb = {}); |
| |
| /** |
| * Create and return ICE options. |
| */ |
| void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept; |
| IceTransportOptions getIceOptions() const noexcept; |
| |
| /** |
| * Inform that a potential peer device have been found. |
| * Returns true only if the device certificate is a valid device certificate. |
| * In that case (true is returned) the account_id parameter is set to the peer account ID. |
| */ |
| static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, |
| dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger); |
| |
| bool findCertificate(const dht::PkId& id, |
| std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb); |
| bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb); |
| |
| std::shared_ptr<ConnectionManager::Config> config_; |
| std::unique_ptr<std::thread> ioContextRunner_; |
| |
| mutable std::mutex randMtx_; |
| mutable std::mt19937_64 rand_; |
| |
| iOSConnectedCallback iOSConnectedCb_ {}; |
| |
| DeviceInfoSet infos_ {}; |
| |
| ChannelRequestCallback channelReqCb_ {}; |
| ConnectionReadyCallback connReadyCb_ {}; |
| onICERequestCallback iceReqCb_ {}; |
| std::atomic_bool isDestroying_ {false}; |
| }; |
| |
| void |
| ConnectionManager::Impl::connectDeviceStartIce( |
| const std::shared_ptr<ConnectionInfo>& info, |
| const std::shared_ptr<dht::crypto::PublicKey>& devicePk, |
| const dht::Value::Id& vid, |
| const std::string& connType, |
| std::function<void(bool)> onConnected) |
| { |
| auto deviceId = devicePk->getLongId(); |
| if (!info) { |
| onConnected(false); |
| return; |
| } |
| |
| std::unique_lock lk(info->mutex_); |
| auto& ice = info->ice_; |
| |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("[device {}] No ICE detected", deviceId); |
| onConnected(false); |
| return; |
| } |
| |
| auto iceAttributes = ice->getLocalAttributes(); |
| std::ostringstream icemsg; |
| icemsg << iceAttributes.ufrag << "\n"; |
| icemsg << iceAttributes.pwd << "\n"; |
| for (const auto& addr : ice->getLocalCandidates(1)) { |
| icemsg << addr << "\n"; |
| if (config_->logger) |
| config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr); |
| } |
| |
| // Prepare connection request as a DHT message |
| PeerConnectionRequest val; |
| |
| val.id = vid; /* Random id for the message unicity */ |
| val.ice_msg = icemsg.str(); |
| val.connType = connType; |
| |
| auto value = std::make_shared<dht::Value>(std::move(val)); |
| value->user_type = "peer_request"; |
| |
| // Send connection request through DHT |
| if (config_->logger) |
| config_->logger->debug("[device {}] Sending connection request", deviceId); |
| dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix |
| + devicePk->getId().toString()), |
| devicePk, |
| value, |
| [l=config_->logger,deviceId](bool ok) { |
| if (l) |
| l->debug("[device {}] Sent connection request. Put encrypted {:s}", |
| deviceId, |
| (ok ? "ok" : "failed")); |
| }); |
| // Wait for call to onResponse() operated by DHT |
| if (isDestroying_) { |
| onConnected(true); // This avoid to wait new negotiation when destroying |
| return; |
| } |
| |
| info->onConnected_ = std::move(onConnected); |
| info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext, |
| std::chrono::steady_clock::now() |
| + DHT_MSG_TIMEOUT); |
| info->waitForAnswer_->async_wait( |
| std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, info, deviceId, vid)); |
| } |
| |
| void |
| ConnectionManager::Impl::onResponse(const asio::error_code& ec, |
| const std::weak_ptr<ConnectionInfo>& winfo, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid) |
| { |
| if (ec == asio::error::operation_aborted) |
| return; |
| auto info = winfo.lock(); |
| if (!info) |
| return; |
| |
| std::unique_lock lk(info->mutex_); |
| auto& ice = info->ice_; |
| if (isDestroying_) { |
| info->onConnected_(true); // The destructor can wake a pending wait here. |
| return; |
| } |
| if (!info->responseReceived_) { |
| if (config_->logger) |
| config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId); |
| info->onConnected_(false); |
| return; |
| } |
| |
| if (!info->ice_) { |
| info->onConnected_(false); |
| return; |
| } |
| |
| auto sdp = ice->parseIceCandidates(info->response_.ice_msg); |
| |
| if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { |
| if (config_->logger) |
| config_->logger->warn("[device {}] start ICE failed", deviceId); |
| info->onConnected_(false); |
| return; |
| } |
| info->onConnected_(true); |
| } |
| |
| bool |
| ConnectionManager::Impl::connectDeviceOnNegoDone( |
| const std::weak_ptr<DeviceInfo>& dinfo, |
| const std::shared_ptr<ConnectionInfo>& info, |
| const DeviceId& deviceId, |
| const std::string& name, |
| const dht::Value::Id& vid, |
| const std::shared_ptr<dht::crypto::Certificate>& cert) |
| { |
| if (!info) |
| return false; |
| |
| std::unique_lock lk {info->mutex_}; |
| if (info->waitForAnswer_) { |
| // Negotiation is done and connected, go to handshake |
| // and avoid any cancellation at this point. |
| info->waitForAnswer_->cancel(); |
| } |
| auto& ice = info->ice_; |
| if (!ice || !ice->isRunning()) { |
| if (config_->logger) |
| config_->logger->error("[device {}] No ICE detected or not running", deviceId); |
| return false; |
| } |
| |
| // Build socket |
| auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( |
| std::move(ice)), |
| true); |
| |
| // Negotiate a TLS session |
| if (config_->logger) |
| config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid); |
| info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint), |
| certStore(), |
| config_->ioContext, |
| identity(), |
| dhParams(), |
| *cert); |
| |
| info->tls_->setOnReady( |
| [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)]( |
| bool ok) { |
| if (auto shared = w.lock()) |
| if (auto info = winfo.lock()) { |
| shared->onTlsNegotiationDone(dinfo.lock(), info, ok, deviceId, vid, name); |
| // Make another reference to info to avoid destruction (could lead to a deadlock/crash). |
| dht::ThreadPool::io().run([info = std::move(info)] {}); |
| } |
| }); |
| return true; |
| } |
| |
| void |
| ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| if (!dht()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (deviceId.toString() == identity().second->getLongId().toString()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| findCertificate(deviceId, |
| [w = weak_from_this(), |
| deviceId, |
| name, |
| cb = std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType](const std::shared_ptr<dht::crypto::Certificate>& cert) { |
| if (!cert) { |
| if (auto shared = w.lock()) |
| if (shared->config_->logger) |
| shared->config_->logger->error( |
| "No valid certificate found for device {}", |
| deviceId); |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (auto shared = w.lock()) { |
| shared->connectDevice(cert, |
| name, |
| std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType); |
| } else |
| cb(nullptr, deviceId); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId, |
| const std::string& name, |
| ConnectCallbackLegacy cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| if (!dht()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (deviceId.toString() == identity().second->getLongId().toString()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| findCertificate(deviceId, |
| [w = weak_from_this(), |
| deviceId, |
| name, |
| cb = std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType](const std::shared_ptr<dht::crypto::Certificate>& cert) { |
| if (!cert) { |
| if (auto shared = w.lock()) |
| if (shared->config_->logger) |
| shared->config_->logger->error( |
| "No valid certificate found for device {}", |
| deviceId); |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (auto shared = w.lock()) { |
| shared->connectDevice(cert, |
| name, |
| [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){ |
| cb(sock, deviceId); |
| }, |
| noNewSocket, |
| forceNewSocket, |
| connType); |
| } else |
| cb(nullptr, deviceId); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| // Avoid dht operation in a DHT callback to avoid deadlocks |
| dht::ThreadPool::computation().run([w = weak_from_this(), |
| name = std::move(name), |
| cert = std::move(cert), |
| cb = std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType] { |
| auto devicePk = cert->getSharedPublicKey(); |
| auto deviceId = devicePk->getLongId(); |
| auto sthis = w.lock(); |
| if (!sthis || sthis->isDestroying_) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| auto di = sthis->infos_.createDeviceInfo(deviceId); |
| std::unique_lock lk(di->mtx_); |
| |
| dht::Value::Id vid; |
| { |
| std::lock_guard lkr(sthis->randMtx_); |
| vid = di->newId(sthis->rand_); |
| } |
| |
| // Check if already connecting |
| auto isConnectingToDevice = di->isConnecting(); |
| // Note: we can be in a state where first |
| // socket is negotiated and first channel is pending |
| // so return only after we checked the info |
| auto& diw = (isConnectingToDevice && !forceNewSocket) |
| ? di->waiting[vid] |
| : di->connecting[vid]; |
| diw = PendingCb {name, std::move(cb)}; |
| |
| // Check if already negotiated |
| if (auto info = di->getConnectedInfo()) { |
| std::unique_lock lkc(info->mutex_); |
| if (auto sock = info->socket_) { |
| info->cbIds_.emplace(vid); |
| diw.requested = true; |
| lkc.unlock(); |
| lk.unlock(); |
| if (sthis->config_->logger) |
| sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId); |
| sthis->sendChannelRequest(di, info, sock, name, vid); |
| return; |
| } |
| } |
| |
| if (isConnectingToDevice && !forceNewSocket) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId); |
| return; |
| } |
| if (noNewSocket) { |
| // If no new socket is specified, we don't try to generate a new socket |
| di->executePendingOperations(lk, vid, nullptr); |
| return; |
| } |
| |
| // Note: used when the ice negotiation fails to erase |
| // all stored structures. |
| auto eraseInfo = [w, diw=std::weak_ptr(di), vid] { |
| if (auto di = diw.lock()) { |
| std::unique_lock lk(di->mtx_); |
| di->info.erase(vid); |
| auto ops = di->extractPendingOperations(vid, nullptr); |
| if (di->empty()) { |
| if (auto shared = w.lock()) |
| shared->infos_.removeDeviceInfo(di->deviceId); |
| } |
| lk.unlock(); |
| for (const auto& op: ops) |
| op.cb(nullptr, di->deviceId); |
| } |
| }; |
| |
| // If no socket exists, we need to initiate an ICE connection. |
| sthis->getIceOptions([w, |
| deviceId = std::move(deviceId), |
| devicePk = std::move(devicePk), |
| diw=std::weak_ptr(di), |
| name = std::move(name), |
| cert = std::move(cert), |
| vid, |
| connType, |
| eraseInfo](auto&& ice_config) { |
| auto sthis = w.lock(); |
| if (!sthis) { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| auto info = std::make_shared<ConnectionInfo>(); |
| auto winfo = std::weak_ptr(info); |
| ice_config.tcpEnable = true; |
| ice_config.onInitDone = [w, |
| devicePk = std::move(devicePk), |
| name = std::move(name), |
| cert = std::move(cert), |
| diw, |
| winfo = std::weak_ptr(info), |
| vid, |
| connType, |
| eraseInfo](bool ok) { |
| dht::ThreadPool::io().run([w = std::move(w), |
| devicePk = std::move(devicePk), |
| vid, |
| winfo, |
| eraseInfo, |
| connType, ok] { |
| auto sthis = w.lock(); |
| if (!ok && sthis && sthis->config_->logger) |
| sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", devicePk->getLongId()); |
| if (!sthis || !ok) { |
| eraseInfo(); |
| return; |
| } |
| sthis->connectDeviceStartIce(winfo.lock(), devicePk, vid, connType, [=](bool ok) { |
| if (!ok) { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| }); |
| }; |
| ice_config.onNegoDone = [w, |
| deviceId, |
| name, |
| cert = std::move(cert), |
| diw, |
| winfo = std::weak_ptr(info), |
| vid, |
| eraseInfo](bool ok) { |
| dht::ThreadPool::io().run([w = std::move(w), |
| deviceId = std::move(deviceId), |
| name = std::move(name), |
| cert = std::move(cert), |
| diw = std::move(diw), |
| winfo = std::move(winfo), |
| vid = std::move(vid), |
| eraseInfo = std::move(eraseInfo), |
| ok] { |
| auto sthis = w.lock(); |
| if (!ok && sthis && sthis->config_->logger) |
| sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId); |
| if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(diw, winfo.lock(), deviceId, name, vid, cert)) |
| eraseInfo(); |
| }); |
| }; |
| |
| if (auto di = diw.lock()) { |
| std::lock_guard lk(di->mtx_); |
| di->info[vid] = info; |
| } |
| std::unique_lock lk {info->mutex_}; |
| ice_config.master = false; |
| ice_config.streamsCount = 1; |
| ice_config.compCountPerStream = 1; |
| info->ice_ = sthis->config_->factory->createUTransport(""); |
| if (!info->ice_) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->error("[device {}] Unable to initialize ICE session.", deviceId); |
| eraseInfo(); |
| return; |
| } |
| // We need to detect any shutdown if the ice session is destroyed before going to the |
| // TLS session; |
| info->ice_->setOnShutdown([eraseInfo]() { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| }); |
| try { |
| info->ice_->initIceInstance(ice_config); |
| } catch (const std::exception& e) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->error("{}", e.what()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& dinfow, |
| const std::weak_ptr<ConnectionInfo>& cinfow, |
| const std::shared_ptr<MultiplexedSocket>& sock, |
| const std::string& name, |
| const dht::Value::Id& vid) |
| { |
| auto channelSock = sock->addChannel(name); |
| if (!channelSock) { |
| if (config_->logger) |
| config_->logger->error("sendChannelRequest failed - unable to create channel"); |
| if (auto info = dinfow.lock()) |
| info->executePendingOperations(vid, nullptr); |
| return; |
| } |
| channelSock->onShutdown([dinfow, name, vid] { |
| if (auto info = dinfow.lock()) |
| info->executePendingOperations(vid, nullptr); |
| }); |
| channelSock->onReady( |
| [dinfow, cinfow, wSock = std::weak_ptr(channelSock), name, vid](bool accepted) { |
| if (auto dinfo = dinfow.lock()) { |
| dinfo->executePendingOperations(vid, accepted ? wSock.lock() : nullptr, accepted); |
| // Always lock top-down cinfo->mutex |
| dht::ThreadPool::io().run([cinfow, vid]() { |
| if (auto cinfo = cinfow.lock()) { |
| std::lock_guard lk(cinfo->mutex_); |
| cinfo->cbIds_.erase(vid); |
| } |
| }); |
| } |
| }); |
| |
| ChannelRequest val; |
| val.name = channelSock->name(); |
| val.state = ChannelRequestState::REQUEST; |
| val.channel = channelSock->channel(); |
| msgpack::sbuffer buffer(256); |
| msgpack::pack(buffer, val); |
| |
| std::error_code ec; |
| int res = sock->write(CONTROL_CHANNEL, |
| reinterpret_cast<const uint8_t*>(buffer.data()), |
| buffer.size(), |
| ec); |
| if (res < 0) { |
| // TODO check if we should handle errors here |
| if (config_->logger) |
| config_->logger->error("sendChannelRequest failed - error: {}", ec.message()); |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::onPeerResponse(PeerConnectionRequest&& req) |
| { |
| auto device = req.owner->getLongId(); |
| if (auto info = infos_.getInfo(device, req.id)) { |
| if (config_->logger) |
| config_->logger->debug("[device {}] New response received", device); |
| std::lock_guard lk {info->mutex_}; |
| info->responseReceived_ = true; |
| info->response_ = std::move(req); |
| info->waitForAnswer_->expires_at(std::chrono::steady_clock::now()); |
| info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, |
| this, |
| std::placeholders::_1, |
| std::weak_ptr(info), |
| device, |
| req.id)); |
| } else { |
| if (config_->logger) |
| config_->logger->warn("[device {}] Response received, but unable to find request", device); |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk) |
| { |
| if (!dht()) |
| return; |
| dht()->listen<PeerConnectionRequest>( |
| dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()), |
| [w = weak_from_this()](PeerConnectionRequest&& req) { |
| auto shared = w.lock(); |
| if (!shared) |
| return false; |
| if (shared->isMessageTreated(req.id)) { |
| // Message already treated. Just ignore |
| return true; |
| } |
| if (req.isAnswer) { |
| if (shared->config_->logger) |
| shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId()); |
| } else { |
| if (shared->config_->logger) |
| shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId()); |
| } |
| if (req.isAnswer) { |
| shared->onPeerResponse(std::move(req)); |
| } else { |
| // Async certificate checking |
| shared->findCertificate( |
| req.from, |
| [w, req = std::move(req)]( |
| const std::shared_ptr<dht::crypto::Certificate>& cert) mutable { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| dht::InfoHash peer_h; |
| if (foundPeerDevice(cert, peer_h, shared->config_->logger)) { |
| #if TARGET_OS_IOS |
| if (shared->iOSConnectedCb_(req.connType, peer_h)) |
| return; |
| #endif |
| shared->onDhtPeerRequest(req, cert); |
| } else { |
| if (shared->config_->logger) |
| shared->config_->logger->warn( |
| "[device {}] Received request from untrusted peer", |
| req.owner->getLongId()); |
| } |
| }); |
| } |
| |
| return true; |
| }, |
| dht::Value::UserTypeFilter("peer_request")); |
| } |
| |
| void |
| ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& dinfo, |
| const std::shared_ptr<ConnectionInfo>& info, |
| bool ok, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid, |
| const std::string& name) |
| { |
| if (isDestroying_) |
| return; |
| // Note: only handle pendingCallbacks here for TLS initied by connectDevice() |
| // Note: if not initied by connectDevice() the channel name will be empty (because no channel |
| // asked yet) |
| auto isDhtRequest = name.empty(); |
| if (!ok) { |
| if (isDhtRequest) { |
| if (config_->logger) |
| config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| } else { |
| if (config_->logger) |
| config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| dinfo->executePendingOperations(vid, nullptr); |
| } |
| |
| std::unique_lock lk(dinfo->mtx_); |
| dinfo->info.erase(vid); |
| |
| if (dinfo->empty()) { |
| infos_.removeDeviceInfo(dinfo->deviceId); |
| } |
| } else { |
| // The socket is ready, store it |
| if (isDhtRequest) { |
| if (config_->logger) |
| config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}", |
| deviceId, |
| vid); |
| } else { |
| if (config_->logger) |
| config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| } |
| |
| // Note: do not remove pending there it's done in sendChannelRequest |
| std::unique_lock lk2 {dinfo->mtx_}; |
| auto pendingIds = dinfo->requestPendingOps(); |
| lk2.unlock(); |
| std::unique_lock lk {info->mutex_}; |
| addNewMultiplexedSocket(dinfo, deviceId, vid, info); |
| // Finally, open the channel and launch pending callbacks |
| lk.unlock(); |
| for (const auto& [id, name]: pendingIds) { |
| if (config_->logger) |
| config_->logger->debug("[device {}] Send request on TLS socket for channel {}", |
| deviceId, name); |
| sendChannelRequest(dinfo, info, info->socket_, name, id); |
| } |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::answerTo(IceTransport& ice, |
| const dht::Value::Id& id, |
| const std::shared_ptr<dht::crypto::PublicKey>& from) |
| { |
| // NOTE: This is a shortest version of a real SDP message to save some bits |
| auto iceAttributes = ice.getLocalAttributes(); |
| std::ostringstream icemsg; |
| icemsg << iceAttributes.ufrag << "\n"; |
| icemsg << iceAttributes.pwd << "\n"; |
| for (const auto& addr : ice.getLocalCandidates(1)) { |
| icemsg << addr << "\n"; |
| } |
| |
| // Send PeerConnection response |
| PeerConnectionRequest val; |
| val.id = id; |
| val.ice_msg = icemsg.str(); |
| val.isAnswer = true; |
| auto value = std::make_shared<dht::Value>(std::move(val)); |
| value->user_type = "peer_request"; |
| |
| if (config_->logger) |
| config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId()); |
| dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix |
| + from->getId().toString()), |
| from, |
| value, |
| [from,l=config_->logger](bool ok) { |
| if (l) |
| l->debug("[device {}] Answer to connection request: put encrypted {:s}", |
| from->getLongId(), |
| (ok ? "ok" : "failed")); |
| }); |
| } |
| |
| bool |
| ConnectionManager::Impl::onRequestStartIce(const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req) |
| { |
| if (!info) |
| return false; |
| |
| auto deviceId = req.owner->getLongId(); |
| std::unique_lock lk {info->mutex_}; |
| auto& ice = info->ice_; |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("[device {}] No ICE detected", deviceId); |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| return false; |
| } |
| |
| auto sdp = ice->parseIceCandidates(req.ice_msg); |
| answerTo(*ice, req.id, req.owner); |
| if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { |
| if (config_->logger) |
| config_->logger->error("[device {}] Start ICE failed", deviceId); |
| ice = nullptr; |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| return false; |
| } |
| return true; |
| } |
| |
| bool |
| ConnectionManager::Impl::onRequestOnNegoDone(const std::weak_ptr<DeviceInfo>& dinfo, const std::shared_ptr<ConnectionInfo>& info, const PeerConnectionRequest& req) |
| { |
| if (!info) |
| return false; |
| |
| auto deviceId = req.owner->getLongId(); |
| std::unique_lock lk {info->mutex_}; |
| auto& ice = info->ice_; |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("[device {}] No ICE detected", deviceId); |
| return false; |
| } |
| |
| // Build socket |
| auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( |
| std::move(ice)), |
| false); |
| |
| // init TLS session |
| auto ph = req.from; |
| if (config_->logger) |
| config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}", |
| deviceId, |
| req.id); |
| info->tls_ = std::make_unique<TlsSocketEndpoint>( |
| std::move(endpoint), |
| certStore(), |
| config_->ioContext, |
| identity(), |
| dhParams(), |
| [ph, deviceId, w=weak_from_this(), l=config_->logger](const dht::crypto::Certificate& cert) { |
| auto shared = w.lock(); |
| if (!shared) |
| return false; |
| if (cert.getPublicKey().getId() != ph |
| || deviceId != cert.getPublicKey().getLongId()) { |
| if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.", |
| deviceId, |
| cert.getPublicKey().getLongId()); |
| return false; |
| } |
| auto crt = shared->certStore().getCertificate(cert.getLongId().toString()); |
| if (!crt) |
| return false; |
| return crt->getPacked() == cert.getPacked(); |
| }); |
| |
| info->tls_->setOnReady( |
| [w = weak_from_this(), dinfo, winfo=std::weak_ptr(info), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) { |
| if (auto shared = w.lock()) |
| if (auto info = winfo.lock()) { |
| shared->onTlsNegotiationDone(dinfo.lock(), winfo.lock(), ok, deviceId, vid); |
| // Make another reference to info to avoid destruction (could lead to a deadlock/crash). |
| dht::ThreadPool::io().run([info = std::move(info)] {}); |
| } |
| }); |
| return true; |
| } |
| |
| void |
| ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, |
| const std::shared_ptr<dht::crypto::Certificate>& /*cert*/) |
| { |
| auto deviceId = req.owner->getLongId(); |
| if (config_->logger) |
| config_->logger->debug("[device {}] New connection request", deviceId); |
| if (!iceReqCb_ || !iceReqCb_(deviceId)) { |
| if (config_->logger) |
| config_->logger->debug("[device {}] Refusing connection", deviceId); |
| return; |
| } |
| |
| // Because the connection is accepted, create an ICE socket. |
| getIceOptions([w = weak_from_this(), req, deviceId](auto&& ice_config) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| |
| auto di = shared->infos_.createDeviceInfo(deviceId); |
| auto info = std::make_shared<ConnectionInfo>(); |
| auto wdi = std::weak_ptr(di); |
| auto winfo = std::weak_ptr(info); |
| |
| // Note: used when the ice negotiation fails to erase |
| // all stored structures. |
| auto eraseInfo = [w, wdi, id = req.id] { |
| auto shared = w.lock(); |
| if (auto di = wdi.lock()) { |
| std::unique_lock lk(di->mtx_); |
| di->info.erase(id); |
| auto ops = di->extractPendingOperations(id, nullptr); |
| if (di->empty()) { |
| if (shared) |
| shared->infos_.removeDeviceInfo(di->deviceId); |
| } |
| lk.unlock(); |
| for (const auto& op: ops) |
| op.cb(nullptr, di->deviceId); |
| if (shared && shared->connReadyCb_) |
| shared->connReadyCb_(di->deviceId, "", nullptr); |
| } |
| }; |
| |
| ice_config.master = true; |
| ice_config.streamsCount = 1; |
| ice_config.compCountPerStream = 1; // TCP |
| ice_config.tcpEnable = true; |
| ice_config.onInitDone = [w, winfo, req, eraseInfo](bool ok) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| if (!ok) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("[device {}] Unable to initialize ICE session.", req.owner->getLongId()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| |
| dht::ThreadPool::io().run( |
| [w = std::move(w), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] { |
| if (auto shared = w.lock()) { |
| if (!shared->onRequestStartIce(winfo.lock(), req)) |
| eraseInfo(); |
| } |
| }); |
| }; |
| |
| ice_config.onNegoDone = [w, wdi, winfo, req, eraseInfo](bool ok) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| if (!ok) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| |
| dht::ThreadPool::io().run( |
| [w = std::move(w), wdi = std::move(wdi), winfo = std::move(winfo), req = std::move(req), eraseInfo = std::move(eraseInfo)] { |
| if (auto shared = w.lock()) |
| if (!shared->onRequestOnNegoDone(wdi.lock(), winfo.lock(), req)) |
| eraseInfo(); |
| }); |
| }; |
| |
| // Negotiate a new ICE socket |
| { |
| std::lock_guard lk(di->mtx_); |
| di->info[req.id] = info; |
| } |
| |
| if (shared->config_->logger) |
| shared->config_->logger->debug("[device {}] Accepting connection", deviceId); |
| std::unique_lock lk {info->mutex_}; |
| info->ice_ = shared->config_->factory->createUTransport(""); |
| if (not info->ice_) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("[device {}] Unable to initialize ICE session", deviceId); |
| eraseInfo(); |
| return; |
| } |
| // We need to detect any shutdown if the ice session is destroyed before going to the TLS session; |
| info->ice_->setOnShutdown([eraseInfo]() { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| }); |
| try { |
| info->ice_->initIceInstance(ice_config); |
| } catch (const std::exception& e) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("{}", e.what()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>& dinfo, const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ConnectionInfo>& info) |
| { |
| info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, deviceId, std::move(info->tls_), config_->logger); |
| info->socket_->setOnReady( |
| [w = weak_from_this()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) { |
| if (auto sthis = w.lock()) |
| if (sthis->connReadyCb_) |
| sthis->connReadyCb_(deviceId, socket->name(), socket); |
| }); |
| info->socket_->setOnRequest([w = weak_from_this()](const std::shared_ptr<dht::crypto::Certificate>& peer, |
| const uint16_t&, |
| const std::string& name) { |
| if (auto sthis = w.lock()) |
| if (sthis->channelReqCb_) |
| return sthis->channelReqCb_(peer, name); |
| return false; |
| }); |
| info->socket_->onShutdown([dinfo, wi=std::weak_ptr(info), vid]() { |
| // Cancel current outgoing connections |
| dht::ThreadPool::io().run([dinfo, wi, vid] { |
| std::set<dht::Value::Id> ids; |
| if (auto info = wi.lock()) { |
| std::lock_guard lk(info->mutex_); |
| if (info->socket_) { |
| ids = std::move(info->cbIds_); |
| info->socket_->shutdown(); |
| } |
| } |
| if (auto deviceInfo = dinfo.lock()) { |
| std::shared_ptr<ConnectionInfo> info; |
| std::vector<PendingCb> ops; |
| std::unique_lock lk(deviceInfo->mtx_); |
| auto it = deviceInfo->info.find(vid); |
| if (it != deviceInfo->info.end()) { |
| info = std::move(it->second); |
| deviceInfo->info.erase(it); |
| } |
| for (const auto& cbId : ids) { |
| auto po = deviceInfo->extractPendingOperations(cbId, nullptr); |
| ops.insert(ops.end(), po.begin(), po.end()); |
| } |
| lk.unlock(); |
| for (auto& op : ops) |
| op.cb(nullptr, deviceInfo->deviceId); |
| } |
| }); |
| }); |
| } |
| |
| const std::shared_future<tls::DhParams> |
| ConnectionManager::Impl::dhParams() const |
| { |
| return dht::ThreadPool::computation().get<tls::DhParams>( |
| std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams")); |
| } |
| |
| bool |
| ConnectionManager::Impl::isMessageTreated(dht::Value::Id id) |
| { |
| std::lock_guard lock(messageMutex_); |
| return !treatedMessages_.add(id); |
| } |
| |
| |
| IpAddr |
| ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const |
| { |
| if (family == AF_INET) |
| return publishedIp_[0]; |
| if (family == AF_INET6) |
| return publishedIp_[1]; |
| |
| assert(family == AF_UNSPEC); |
| |
| // If family is not set, prefere IPv4 if available. It's more |
| // likely to succeed behind NAT. |
| if (publishedIp_[0]) |
| return publishedIp_[0]; |
| if (publishedIp_[1]) |
| return publishedIp_[1]; |
| return {}; |
| } |
| |
| void |
| ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr) |
| { |
| if (ip_addr.getFamily() == AF_INET) { |
| publishedIp_[0] = ip_addr; |
| } else { |
| publishedIp_[1] = ip_addr; |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb) |
| { |
| dht()->getPublicAddress([w=weak_from_this(), cb = std::move(cb)](std::vector<dht::SockAddr>&& results) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| bool hasIpv4 {false}, hasIpv6 {false}; |
| for (auto& result : results) { |
| auto family = result.getFamily(); |
| if (family == AF_INET) { |
| if (not hasIpv4) { |
| hasIpv4 = true; |
| if (shared->config_->logger) |
| shared->config_->logger->debug("Store DHT public IPv4 address: {}", result); |
| //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str()); |
| shared->setPublishedAddress(*result.get()); |
| if (shared->config_->upnpCtrl) { |
| shared->config_->upnpCtrl->setPublicAddress(*result.get()); |
| } |
| } |
| } else if (family == AF_INET6) { |
| if (not hasIpv6) { |
| hasIpv6 = true; |
| if (shared->config_->logger) |
| shared->config_->logger->debug("Store DHT public IPv6 address: {}", result); |
| shared->setPublishedAddress(*result.get()); |
| } |
| } |
| if (hasIpv4 and hasIpv6) |
| break; |
| } |
| if (cb) |
| cb(); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept |
| { |
| storeActiveIpAddress([this, cb = std::move(cb)] { |
| IceTransportOptions opts = ConnectionManager::Impl::getIceOptions(); |
| auto publishedAddr = getPublishedIpAddress(); |
| |
| if (publishedAddr) { |
| auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(), |
| publishedAddr.getFamily()); |
| if (interfaceAddr) { |
| opts.accountLocalAddr = interfaceAddr; |
| opts.accountPublicAddr = publishedAddr; |
| } |
| } |
| if (cb) |
| cb(std::move(opts)); |
| }); |
| } |
| |
| IceTransportOptions |
| ConnectionManager::Impl::getIceOptions() const noexcept |
| { |
| IceTransportOptions opts; |
| opts.factory = config_->factory; |
| opts.upnpEnable = config_->upnpEnabled; |
| opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr; |
| |
| if (config_->stunEnabled) |
| opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer)); |
| if (config_->turnEnabled) { |
| if (config_->turnCache) { |
| auto turnAddr = config_->turnCache->getResolvedTurn(); |
| if (turnAddr != std::nullopt) { |
| opts.turnServers.emplace_back(TurnServerInfo() |
| .setUri(turnAddr->toString()) |
| .setUsername(config_->turnServerUserName) |
| .setPassword(config_->turnServerPwd) |
| .setRealm(config_->turnServerRealm)); |
| } |
| } else { |
| opts.turnServers.emplace_back(TurnServerInfo() |
| .setUri(config_->turnServer) |
| .setUsername(config_->turnServerUserName) |
| .setPassword(config_->turnServerPwd) |
| .setRealm(config_->turnServerRealm)); |
| } |
| // NOTE: first test with ipv6 turn was not concluant and resulted in multiple |
| // co issues. So this needs some debug. for now just disable |
| // if (cacheTurnV6 && *cacheTurnV6) { |
| // opts.turnServers.emplace_back(TurnServerInfo() |
| // .setUri(cacheTurnV6->toString(true)) |
| // .setUsername(turnServerUserName_) |
| // .setPassword(turnServerPwd_) |
| // .setRealm(turnServerRealm_)); |
| //} |
| } |
| return opts; |
| } |
| |
| bool |
| ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, |
| dht::InfoHash& account_id, |
| const std::shared_ptr<Logger>& logger) |
| { |
| if (not crt) |
| return false; |
| |
| auto top_issuer = crt; |
| while (top_issuer->issuer) |
| top_issuer = top_issuer->issuer; |
| |
| // Unable to self-signed device certificate |
| if (top_issuer == crt) { |
| if (logger) |
| logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId()); |
| return false; |
| } |
| |
| // Check peer certificate chain |
| // Trust store with top issuer as the only CA |
| dht::crypto::TrustList peer_trust; |
| peer_trust.add(*top_issuer); |
| if (not peer_trust.verify(*crt)) { |
| if (logger) |
| logger->warn("Found invalid peer device: {}", crt->getLongId()); |
| return false; |
| } |
| |
| // Check cached OCSP response |
| if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) { |
| if (logger) |
| logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId()); |
| return false; |
| } |
| |
| account_id = crt->issuer->getId(); |
| if (logger) |
| logger->warn("Found peer device: {} account:{} CA:{}", |
| crt->getLongId(), |
| account_id, |
| top_issuer->getId()); |
| return true; |
| } |
| |
| bool |
| ConnectionManager::Impl::findCertificate( |
| const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb) |
| { |
| if (auto cert = certStore().getCertificate(id.toString())) { |
| if (cb) |
| cb(cert); |
| } else if (cb) |
| cb(nullptr); |
| return true; |
| } |
| |
| bool |
| ConnectionManager::Impl::findCertificate(const dht::InfoHash& h, |
| std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb) |
| { |
| if (auto cert = certStore().getCertificate(h.toString())) { |
| if (cb) |
| cb(cert); |
| } else { |
| dht()->findCertificate(h, |
| [cb = std::move(cb), this]( |
| const std::shared_ptr<dht::crypto::Certificate>& crt) { |
| if (crt) |
| certStore().pinCertificate(crt); |
| if (cb) |
| cb(crt); |
| }); |
| } |
| return true; |
| } |
| |
| std::shared_ptr<ConnectionManager::Config> |
| buildDefaultConfig(dht::crypto::Identity id){ |
| auto conf = std::make_shared<ConnectionManager::Config>(); |
| conf->id = std::move(id); |
| return conf; |
| } |
| |
| ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_) |
| : pimpl_ {std::make_shared<Impl>(config_)} |
| {} |
| |
| ConnectionManager::ConnectionManager(dht::crypto::Identity id) |
| : ConnectionManager {buildDefaultConfig(id)} |
| {} |
| |
| ConnectionManager::~ConnectionManager() |
| { |
| if (pimpl_) |
| pimpl_->shutdown(); |
| } |
| |
| void |
| ConnectionManager::connectDevice(const DeviceId& deviceId, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType); |
| } |
| |
| void |
| ConnectionManager::connectDevice(const dht::InfoHash& deviceId, |
| const std::string& name, |
| ConnectCallbackLegacy cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType); |
| } |
| |
| |
| void |
| ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType); |
| } |
| |
| bool |
| ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const |
| { |
| if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { |
| std::unique_lock lk {dinfo->mtx_}; |
| return dinfo->isConnecting(name); |
| } |
| return false; |
| } |
| |
| bool |
| ConnectionManager::isConnected(const DeviceId& deviceId) const |
| { |
| if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { |
| std::unique_lock lk {dinfo->mtx_}; |
| return dinfo->getConnectedInfo() != nullptr; |
| } |
| return false; |
| } |
| |
| void |
| ConnectionManager::closeConnectionsWith(const std::string& peerUri) |
| { |
| std::vector<std::shared_ptr<DeviceInfo>> dInfos; |
| for (const auto& dinfo: pimpl_->infos_.getDeviceInfos()) { |
| std::unique_lock lk(dinfo->mtx_); |
| bool isPeer = false; |
| for (auto const& [id, cinfo]: dinfo->info) { |
| std::lock_guard lkv {cinfo->mutex_}; |
| auto tls = cinfo->tls_ ? cinfo->tls_.get() : (cinfo->socket_ ? cinfo->socket_->endpoint() : nullptr); |
| auto cert = tls ? tls->peerCertificate() : nullptr; |
| if (not cert) |
| cert = pimpl_->certStore().getCertificate(dinfo->deviceId.toString()); |
| if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) { |
| isPeer = true; |
| break; |
| } |
| } |
| lk.unlock(); |
| if (isPeer) { |
| dInfos.emplace_back(std::move(dinfo)); |
| } |
| } |
| // Stop connections to all peers devices |
| for (const auto& dinfo : dInfos) { |
| std::unique_lock lk {dinfo->mtx_}; |
| auto unused = dinfo->extractUnusedConnections(); |
| auto pending = dinfo->extractPendingOperations(0, nullptr); |
| pimpl_->infos_.removeDeviceInfo(dinfo->deviceId); |
| lk.unlock(); |
| for (auto& op : unused) |
| op->shutdown(); |
| for (auto& op : pending) |
| op.cb(nullptr, dinfo->deviceId); |
| } |
| } |
| |
| void |
| ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk) |
| { |
| pimpl_->onDhtConnected(devicePk); |
| } |
| |
| void |
| ConnectionManager::onICERequest(onICERequestCallback&& cb) |
| { |
| pimpl_->iceReqCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb) |
| { |
| pimpl_->channelReqCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb) |
| { |
| pimpl_->connReadyCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb) |
| { |
| pimpl_->iOSConnectedCb_ = std::move(cb); |
| } |
| |
| std::size_t |
| ConnectionManager::activeSockets() const |
| { |
| return pimpl_->infos_.getConnectedInfos().size(); |
| } |
| |
| void |
| ConnectionManager::monitor() const |
| { |
| auto logger = pimpl_->config_->logger; |
| if (!logger) |
| return; |
| logger->debug("ConnectionManager current status:"); |
| for (const auto& ci : pimpl_->infos_.getConnectedInfos()) { |
| std::lock_guard lk(ci->mutex_); |
| if (ci->socket_) |
| ci->socket_->monitor(); |
| } |
| logger->debug("ConnectionManager end status."); |
| } |
| |
| void |
| ConnectionManager::connectivityChanged() |
| { |
| for (const auto& ci : pimpl_->infos_.getConnectedInfos()) { |
| std::lock_guard lk(ci->mutex_); |
| if (ci->socket_) |
| dht::ThreadPool::io().run([s = ci->socket_] { s->sendBeacon(); }); |
| } |
| } |
| |
| void |
| ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept |
| { |
| return pimpl_->getIceOptions(std::move(cb)); |
| } |
| |
| IceTransportOptions |
| ConnectionManager::getIceOptions() const noexcept |
| { |
| return pimpl_->getIceOptions(); |
| } |
| |
| IpAddr |
| ConnectionManager::getPublishedIpAddress(uint16_t family) const |
| { |
| return pimpl_->getPublishedIpAddress(family); |
| } |
| |
| void |
| ConnectionManager::setPublishedAddress(const IpAddr& ip_addr) |
| { |
| return pimpl_->setPublishedAddress(ip_addr); |
| } |
| |
| void |
| ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb) |
| { |
| return pimpl_->storeActiveIpAddress(std::move(cb)); |
| } |
| |
| std::shared_ptr<ConnectionManager::Config> |
| ConnectionManager::getConfig() |
| { |
| return pimpl_->config_; |
| } |
| |
| std::vector<std::map<std::string, std::string>> |
| ConnectionManager::getConnectionList(const DeviceId& device) const |
| { |
| std::vector<std::map<std::string, std::string>> connectionsList; |
| if (device) { |
| if (auto deviceInfo = pimpl_->infos_.getDeviceInfo(device)) { |
| connectionsList = deviceInfo->getConnectionList(pimpl_->certStore()); |
| } |
| } else { |
| for (const auto& deviceInfo : pimpl_->infos_.getDeviceInfos()) { |
| auto cl = deviceInfo->getConnectionList(pimpl_->certStore()); |
| connectionsList.insert(connectionsList.end(), std::make_move_iterator(cl.begin()), std::make_move_iterator(cl.end())); |
| } |
| } |
| return connectionsList; |
| } |
| |
| std::vector<std::map<std::string, std::string>> |
| ConnectionManager::getChannelList(const std::string& connectionId) const |
| { |
| auto [deviceId, valueId] = parseCallbackId(connectionId); |
| if (auto info = pimpl_->infos_.getInfo(deviceId, valueId)) { |
| std::lock_guard lk(info->mutex_); |
| if (info->socket_) |
| return info->socket_->getChannelList(); |
| } |
| return {}; |
| } |
| |
| } // namespace dhtnet |