blob: a860ec3c34acfd5d48c3e253bcca2c3f90021aa3 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
3 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
Adrien Béraudcb753622023-07-17 22:32:49 -040015 * along with this program. If not, see <https://www.gnu.org/licenses/>.
Adrien Béraud612b55b2023-05-29 10:42:04 -040016 */
Adrien Béraud612b55b2023-05-29 10:42:04 -040017#include "ice_transport.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040018#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040019#include "ice_socket.h"
20#include "sip_utils.h"
21#include "string_utils.h"
22#include "upnp/upnp_control.h"
23#include "transport/peer_channel.h"
24#include "tracepoint/tracepoint.h"
25
26#include <opendht/logger.h>
27#include <opendht/utils.h>
28
Adrien Béraud9e0f84f2023-07-27 16:02:21 -040029extern "C" {
Adrien Béraud612b55b2023-05-29 10:42:04 -040030#include <pjlib.h>
Adrien Béraud9e0f84f2023-07-27 16:02:21 -040031}
Adrien Béraud612b55b2023-05-29 10:42:04 -040032
33#include <map>
34#include <atomic>
35#include <queue>
36#include <mutex>
37#include <condition_variable>
38#include <thread>
39#include <utility>
40#include <tuple>
41#include <algorithm>
42#include <sstream>
43#include <chrono>
Adrien Béraud67ff0772023-07-12 11:22:29 -040044#include <memory>
Adrien Béraud612b55b2023-05-29 10:42:04 -040045#include <cerrno>
46
47#include "pj/limits.h"
48
49#define TRY(ret) \
50 do { \
51 if ((ret) != PJ_SUCCESS) \
52 throw std::runtime_error(#ret " failed"); \
53 } while (0)
54
55// Validate that the component ID is within the expected range
56#define ASSERT_COMP_ID(compId, compCount) \
57 do { \
58 if ((compId) == 0 or (compId) > (compCount)) \
59 throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \
60 } while (0)
61
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040062namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040063
64static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
65static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header
66static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header
67static constexpr int MAX_CANDIDATES {32};
68static constexpr int MAX_DESTRUCTION_TIMEOUT {3000};
69static constexpr int HANDLE_EVENT_DURATION {500};
Amna49da80f2024-08-26 16:54:40 -040070static constexpr std::chrono::seconds PORT_MAPPING_TIMEOUT {4};
Adrien Béraud612b55b2023-05-29 10:42:04 -040071//==============================================================================
72
Adrien Béraud612b55b2023-05-29 10:42:04 -040073using namespace upnp;
74
75//==============================================================================
76
77class IceLock
78{
79 pj_grp_lock_t* lk_;
80
81public:
82 IceLock(pj_ice_strans* strans)
83 : lk_(pj_ice_strans_get_grp_lock(strans))
84 {
85 lock();
86 }
87
88 ~IceLock() { unlock(); }
89
90 void lock() { if (lk_) pj_grp_lock_acquire(lk_); }
91
92 void unlock() { if (lk_) pj_grp_lock_release(lk_); }
93};
94
95class IceTransport::Impl
96{
97public:
Adrien Béraud89933c12023-07-26 14:53:30 -040098 Impl(std::string_view name, const std::shared_ptr<Logger>& logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -040099 ~Impl();
100
101 void initIceInstance(const IceTransportOptions& options);
102
103 void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status);
104
105 void onReceiveData(unsigned comp_id, void* pkt, pj_size_t size);
106
107 /**
108 * Set/change transport role as initiator.
109 * Should be called before start method.
110 */
111 bool setInitiatorSession();
112
113 /**
114 * Set/change transport role as slave.
115 * Should be called before start method.
116 */
117 bool setSlaveSession();
118 bool createIceSession(pj_ice_sess_role role);
119
120 void getUFragPwd();
121
122 std::string link() const;
123
124 bool _isInitialized() const;
125 bool _isStarted() const;
126 bool _isRunning() const;
127 bool _isFailed() const;
128 bool _waitForInitialization(std::chrono::milliseconds timeout);
129
130 const pj_ice_sess_cand* getSelectedCandidate(unsigned comp_id, bool remote) const;
131 IpAddr getLocalAddress(unsigned comp_id) const;
132 IpAddr getRemoteAddress(unsigned comp_id) const;
133 static const char* getCandidateType(const pj_ice_sess_cand* cand);
134 bool isTcpEnabled() const { return config_.protocol == PJ_ICE_TP_TCP; }
135 bool addStunConfig(int af);
136 void requestUpnpMappings();
137 bool hasUpnp() const;
138 // Take a list of address pairs (local/public) and add them as
139 // reflexive candidates using STUN config.
140 void addServerReflexiveCandidates(const std::vector<std::pair<IpAddr, IpAddr>>& addrList);
141 // Generate server reflexive candidates using the published (DHT/Account) address
142 std::vector<std::pair<IpAddr, IpAddr>> setupGenericReflexiveCandidates();
143 // Generate server reflexive candidates using UPNP mappings.
144 std::vector<std::pair<IpAddr, IpAddr>> setupUpnpReflexiveCandidates();
145 void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr);
146 IpAddr getDefaultRemoteAddress(unsigned comp_id) const;
147 bool handleEvents(unsigned max_msec);
148 int flushTimerHeapAndIoQueue();
149 int checkEventQueue(int maxEventToPoll);
150
151 std::shared_ptr<dht::log::Logger> logger_ {};
Amna81221ad2023-09-14 17:33:26 -0400152 std::shared_ptr<dhtnet::IceTransportFactory> factory {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400153
154 std::condition_variable_any iceCV_ {};
155
156 std::string sessionName_ {};
157 std::unique_ptr<pj_pool_t, decltype(&pj_pool_release)> pool_ {nullptr, pj_pool_release};
158 bool isTcp_ {false};
159 bool upnpEnabled_ {false};
160 IceTransportCompleteCb on_initdone_cb_ {};
161 IceTransportCompleteCb on_negodone_cb_ {};
162 pj_ice_strans* icest_ {nullptr};
163 unsigned streamsCount_ {0};
164 unsigned compCountPerStream_ {0};
165 unsigned compCount_ {0};
166 std::string local_ufrag_ {};
167 std::string local_pwd_ {};
168 pj_sockaddr remoteAddr_ {};
169 pj_ice_strans_cfg config_ {};
170 //std::string last_errmsg_ {};
171
172 std::atomic_bool is_stopped_ {false};
173
Adrien Béraud21d5f462023-08-27 12:06:21 -0400174 using Packet = std::vector<uint8_t>;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400175
176 struct ComponentIO
177 {
178 std::mutex mutex;
179 std::condition_variable cv;
180 std::deque<Packet> queue;
181 IceRecvCb recvCb;
182 };
183
184 // NOTE: Component IDs start from 1, while these three vectors
185 // are indexed from 0. Conversion from ID to vector index must
186 // be done properly.
187 std::vector<ComponentIO> compIO_ {};
188 std::vector<PeerChannel> peerChannels_ {};
189 std::vector<IpAddr> iceDefaultRemoteAddr_;
190
191 // ICE controlling role. True for controller agents and false for
192 // controlled agents
193 std::atomic_bool initiatorSession_ {true};
194
195 // Local/Public addresses used by the account owning the ICE instance.
196 IpAddr accountLocalAddr_ {};
197 IpAddr accountPublicAddr_ {};
198
199 // STUN and TURN servers
200 std::vector<StunServerInfo> stunServers_;
201 std::vector<TurnServerInfo> turnServers_;
202
203 /**
204 * Returns the IP of each candidate for a given component in the ICE session
205 */
206 struct LocalCandidate
207 {
208 IpAddr addr;
209 pj_ice_cand_transport transport;
210 };
211
212 std::shared_ptr<upnp::Controller> upnp_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400213 std::map<Mapping::key_t, Mapping> upnpMappings_;
214 std::mutex upnpMappingsMutex_ {};
215
Amna49da80f2024-08-26 16:54:40 -0400216 std::mutex upnpMutex_ {};
217 std::condition_variable upnpCv_;
218
Adrien Béraud612b55b2023-05-29 10:42:04 -0400219 bool onlyIPv4Private_ {true};
220
221 // IO/Timer events are handled by following thread
222 std::thread thread_ {};
223 std::atomic_bool threadTerminateFlags_ {false};
224
225 // Wait data on components
226 mutable std::mutex sendDataMutex_ {};
227 std::condition_variable waitDataCv_ = {};
228 pj_size_t lastSentLen_ {0};
229 bool destroying_ {false};
230 onShutdownCb scb {};
231
232 void cancelOperations()
233 {
234 for (auto& c : peerChannels_)
235 c.stop();
Adrien Béraud024c46f2024-03-02 23:53:18 -0500236 std::lock_guard lk(sendDataMutex_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400237 destroying_ = true;
238 waitDataCv_.notify_all();
239 }
240};
241
242//==============================================================================
243
244/**
245 * Add stun/turn configuration or default host as candidates
246 */
247
248static void
Sébastien Blincf569402023-07-27 09:46:40 -0400249add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info, const std::shared_ptr<dht::log::Logger>& logger)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400250{
251 if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN)
252 throw std::runtime_error("Too many STUN configurations");
253
254 IpAddr ip {info.uri};
255
256 // Given URI cannot be DNS resolved or not IPv4 or IPv6?
257 // This prevents a crash into PJSIP when ip.toString() is called.
258 if (ip.getFamily() == AF_UNSPEC) {
259 /*JAMI_DBG("[ice (%s)] STUN server '%s' not used, unresolvable address",
260 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
261 info.uri.c_str());*/
262 return;
263 }
264
265 auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++];
266 pj_ice_strans_stun_cfg_default(&stun);
267 pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str());
268 stun.af = ip.getFamily();
269 if (!(stun.port = ip.getPort()))
270 stun.port = PJ_STUN_PORT;
271 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
272 stun.conn_type = cfg.stun.conn_type;
Sébastien Blincf569402023-07-27 09:46:40 -0400273 if (logger)
274 logger->debug("added stun server '{}', port {}", pj_strbuf(&stun.server), stun.port);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400275}
276
277static void
Sébastien Blincf569402023-07-27 09:46:40 -0400278add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info, const std::shared_ptr<dht::log::Logger>& logger)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400279{
280 if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN)
281 throw std::runtime_error("Too many TURN servers");
282
283 IpAddr ip {info.uri};
284
285 // Same comment as add_stun_server()
286 if (ip.getFamily() == AF_UNSPEC) {
Sébastien Blincf569402023-07-27 09:46:40 -0400287 if (logger)
288 logger->debug("TURN server '{}' not used, unresolvable address", info.uri);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400289 return;
290 }
291
292 auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++];
293 pj_ice_strans_turn_cfg_default(&turn);
294 pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str());
295 turn.af = ip.getFamily();
296 if (!(turn.port = ip.getPort()))
297 turn.port = PJ_STUN_PORT;
298 turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
299 turn.conn_type = cfg.turn.conn_type;
300
301 // Authorization (only static plain password supported yet)
302 if (not info.password.empty()) {
303 turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
304 turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
305 pj_strset(&turn.auth_cred.data.static_cred.realm,
306 (char*) info.realm.c_str(),
307 info.realm.size());
308 pj_strset(&turn.auth_cred.data.static_cred.username,
309 (char*) info.username.c_str(),
310 info.username.size());
311 pj_strset(&turn.auth_cred.data.static_cred.data,
312 (char*) info.password.c_str(),
313 info.password.size());
314 }
Sébastien Blincf569402023-07-27 09:46:40 -0400315 if (logger)
316 logger->debug("added turn server '{}', port {}", pj_strbuf(&turn.server), turn.port);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400317}
318
319//==============================================================================
320
Adrien Béraud89933c12023-07-26 14:53:30 -0400321IceTransport::Impl::Impl(std::string_view name, const std::shared_ptr<Logger>& logger)
322 : logger_(logger), sessionName_(name)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400323{
324 if (logger_)
Sébastien Blincf569402023-07-27 09:46:40 -0400325 logger_->debug("[ice:{}] Creating IceTransport session for \"{:s}\"", fmt::ptr(this), sessionName_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400326}
327
328IceTransport::Impl::~Impl()
329{
Adrien Béraud612b55b2023-05-29 10:42:04 -0400330 threadTerminateFlags_ = true;
331
332 if (thread_.joinable()) {
333 thread_.join();
334 }
335
336 if (icest_) {
337 pj_ice_strans* strans = nullptr;
338
339 std::swap(strans, icest_);
340
341 // must be done before ioqueue/timer destruction
342 if (logger_)
343 logger_->debug("[ice:{}] Destroying ice_strans {}", pj_ice_strans_get_user_data(strans), fmt::ptr(strans));
344
345 pj_ice_strans_stop_ice(strans);
346 pj_ice_strans_destroy(strans);
347
348 // NOTE: This last timer heap and IO queue polling is necessary to close
349 // TURN socket.
350 // Because when destroying the TURN session pjproject creates a pj_timer
351 // to postpone the TURN destruction. This timer is only called if we poll
352 // the event queue.
353
354 int ret = flushTimerHeapAndIoQueue();
355
356 if (ret < 0) {
357 if (logger_)
358 logger_->error("[ice:{}] IO queue polling failed", fmt::ptr(this));
359 } else if (ret > 0) {
360 if (logger_)
361 logger_->error("[ice:{}] Unexpected left timer in timer heap. "
362 "Please report the bug",
363 fmt::ptr(this));
364 }
365
366 if (checkEventQueue(1) > 0) {
367 if (logger_)
368 logger_->warn("[ice:{}] Unexpected left events in IO queue", fmt::ptr(this));
369 }
370
371 if (config_.stun_cfg.ioqueue)
372 pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
373
374 if (config_.stun_cfg.timer_heap)
375 pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
376 }
377
Adrien Béraud612b55b2023-05-29 10:42:04 -0400378 if (scb)
379 scb();
380}
381
382void
383IceTransport::Impl::initIceInstance(const IceTransportOptions& options)
384{
Amna81221ad2023-09-14 17:33:26 -0400385 factory = options.factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400386 isTcp_ = options.tcpEnable;
387 upnpEnabled_ = options.upnpEnable;
388 on_initdone_cb_ = options.onInitDone;
389 on_negodone_cb_ = options.onNegoDone;
390 streamsCount_ = options.streamsCount;
391 compCountPerStream_ = options.compCountPerStream;
392 compCount_ = streamsCount_ * compCountPerStream_;
393 compIO_ = std::vector<ComponentIO>(compCount_);
394 peerChannels_ = std::vector<PeerChannel>(compCount_);
395 iceDefaultRemoteAddr_.resize(compCount_);
396 initiatorSession_ = options.master;
397 accountLocalAddr_ = std::move(options.accountLocalAddr);
398 accountPublicAddr_ = std::move(options.accountPublicAddr);
399 stunServers_ = std::move(options.stunServers);
400 turnServers_ = std::move(options.turnServers);
401
402 if (logger_)
403 logger_->debug("[ice:{}] Initializing the session - comp count {} - as a {}",
404 fmt::ptr(this),
405 compCount_,
406 initiatorSession_ ? "master" : "slave");
407
Sébastien Blin21cbcfe2023-10-24 14:45:58 -0400408 if (upnpEnabled_) {
409 if (options.upnpContext) {
410 upnp_ = std::make_shared<upnp::Controller>(options.upnpContext);
411 } else if (logger_) {
412 logger_->error("[ice:{}] UPnP enabled, but no context found", fmt::ptr(this));
413 }
414 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400415
Amna81221ad2023-09-14 17:33:26 -0400416 config_ = factory->getIceCfg(); // config copy
Adrien Béraud612b55b2023-05-29 10:42:04 -0400417 if (isTcp_) {
418 config_.protocol = PJ_ICE_TP_TCP;
419 config_.stun.conn_type = PJ_STUN_TP_TCP;
420 config_.turn.conn_type = PJ_TURN_TP_TCP;
421 } else {
422 config_.protocol = PJ_ICE_TP_UDP;
423 config_.stun.conn_type = PJ_STUN_TP_UDP;
424 config_.turn.conn_type = PJ_TURN_TP_UDP;
425 }
Adrien Béraud738aedb2024-02-18 14:56:26 -0500426 if (options.qosType.size() == 1) {
427 config_.stun.cfg.qos_type = (pj_qos_type)options.qosType[0];
428 config_.turn.cfg.qos_type = (pj_qos_type)options.qosType[0];
429 }
430 if (options.qosType.size() == compCount_) {
431 for (unsigned i = 0; i < compCount_; ++i) {
432 config_.comp[i].qos_type = (pj_qos_type)(options.qosType[i]);
433 }
434 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400435
436 pool_.reset(
Amna81221ad2023-09-14 17:33:26 -0400437 pj_pool_create(factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400438 if (not pool_)
439 throw std::runtime_error("pj_pool_create() failed");
440
441 // Note: For server reflexive candidates, UPNP mappings will
442 // be used if available. Then, the public address learnt during
443 // the account registration process will be added only if it
444 // differs from the UPNP public address.
445 // Also note that UPNP candidates should be added first in order
446 // to have a higher priority when performing the connectivity
447 // checks.
448 // STUN configs layout:
449 // - index 0 : host IPv4
450 // - index 1 : host IPv6
451 // - index 2 : upnp/generic srflx IPv4.
452 // - index 3 : generic srflx (if upnp exists and different)
453
454 config_.stun_tp_cnt = 0;
455
Adrien Béraud370257c2023-08-15 20:53:09 -0400456 // if (logger_)
457 // logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400458 addStunConfig(pj_AF_INET());
459 addStunConfig(pj_AF_INET6());
460
461 std::vector<std::pair<IpAddr, IpAddr>> upnpSrflxCand;
462 if (upnp_) {
463 requestUpnpMappings();
464 upnpSrflxCand = setupUpnpReflexiveCandidates();
465 if (not upnpSrflxCand.empty()) {
466 addServerReflexiveCandidates(upnpSrflxCand);
Adrien Béraud370257c2023-08-15 20:53:09 -0400467 // if (logger_)
468 // logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400469 }
470 }
471
472 auto genericSrflxCand = setupGenericReflexiveCandidates();
473
474 if (not genericSrflxCand.empty()) {
475 // Generic srflx candidates will be added only if different
476 // from upnp candidates.
477 if (upnpSrflxCand.empty()
478 or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) {
479 addServerReflexiveCandidates(genericSrflxCand);
Adrien Béraud370257c2023-08-15 20:53:09 -0400480 // if (logger_)
481 // logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400482 }
483 }
484
485 if (upnpSrflxCand.empty() and genericSrflxCand.empty()) {
486 if (logger_)
487 logger_->warn("[ice:{}] No server reflexive candidates added", fmt::ptr(this));
488 }
489
490 pj_ice_strans_cb icecb;
491 pj_bzero(&icecb, sizeof(icecb));
492
493 icecb.on_rx_data = [](pj_ice_strans* ice_st,
494 unsigned comp_id,
495 void* pkt,
496 pj_size_t size,
497 const pj_sockaddr_t* /*src_addr*/,
498 unsigned /*src_addr_len*/) {
499 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
500 tr->onReceiveData(comp_id, pkt, size);
501 };
502
503 icecb.on_ice_complete = [](pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) {
504 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
505 tr->onComplete(ice_st, op, status);
506 };
507
508 if (isTcp_) {
509 icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
510 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
511 std::lock_guard lk(tr->sendDataMutex_);
512 tr->lastSentLen_ += size;
513 tr->waitDataCv_.notify_all();
514 }
515 };
516 }
517
518 icecb.on_destroy = [](pj_ice_strans* ice_st) {
519 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
520 tr->cancelOperations(); // Avoid upper layer to manage this ; Stop read operations
521 };
522
523 // Add STUN servers
524 for (auto& server : stunServers_)
Sébastien Blincf569402023-07-27 09:46:40 -0400525 add_stun_server(*pool_, config_, server, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400526
527 // Add TURN servers
528 for (auto& server : turnServers_)
Sébastien Blincf569402023-07-27 09:46:40 -0400529 add_turn_server(*pool_, config_, server, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400530
531 static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64);
532 TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap));
533 TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue));
534 std::ostringstream sessionName {};
535 // We use the instance pointer as the PJNATH session name in order
536 // to easily identify the logs reported by PJNATH.
537 sessionName << this;
538 pj_status_t status = pj_ice_strans_create(sessionName.str().c_str(),
539 &config_,
540 compCount_,
541 this,
542 &icecb,
543 &icest_);
544
545 if (status != PJ_SUCCESS || icest_ == nullptr) {
546 throw std::runtime_error("pj_ice_strans_create() failed");
547 }
548
549 // Must be created after any potential failure
550 thread_ = std::thread([this] {
551 while (not threadTerminateFlags_) {
552 // NOTE: handleEvents can return false in this case
553 // but here we don't care if there is event or not.
554 handleEvents(HANDLE_EVENT_DURATION);
555 }
556 });
557}
558
559bool
560IceTransport::Impl::_isInitialized() const
561{
562 if (auto *icest = icest_) {
563 auto state = pj_ice_strans_get_state(icest);
564 return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
565 }
566 return false;
567}
568
569bool
570IceTransport::Impl::_isStarted() const
571{
572 if (auto *icest = icest_) {
573 auto state = pj_ice_strans_get_state(icest);
574 return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
575 }
576 return false;
577}
578
579bool
580IceTransport::Impl::_isRunning() const
581{
582 if (auto *icest = icest_) {
583 auto state = pj_ice_strans_get_state(icest);
584 return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
585 }
586 return false;
587}
588
589bool
590IceTransport::Impl::_isFailed() const
591{
592 if (auto *icest = icest_)
593 return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
594 return false;
595}
596
597bool
598IceTransport::Impl::handleEvents(unsigned max_msec)
599{
600 // By tests, never seen more than two events per 500ms
601 static constexpr auto MAX_NET_EVENTS = 2;
602
603 pj_time_val max_timeout = {0, static_cast<long>(max_msec)};
604 pj_time_val timeout = {0, 0};
605 unsigned net_event_count = 0;
606
607 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
608 auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32;
609
610 // timeout limitation
611 if (hasActiveTimer)
612 pj_time_val_normalize(&timeout);
613
614 if (PJ_TIME_VAL_GT(timeout, max_timeout)) {
615 timeout = max_timeout;
616 }
617
618 do {
619 auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
620
621 // timeout
622 if (not n_events)
623 return hasActiveTimer;
624
625 // error
626 if (n_events < 0) {
627 const auto err = pj_get_os_error();
628 // Kept as debug as some errors are "normal" in regular context
629 if (logger_)
630 logger_->debug("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
631 std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
632 return hasActiveTimer;
633 }
634
635 net_event_count += n_events;
636 timeout.sec = timeout.msec = 0;
637 } while (net_event_count < MAX_NET_EVENTS);
638 return hasActiveTimer;
639}
640
641int
642IceTransport::Impl::flushTimerHeapAndIoQueue()
643{
644 pj_time_val timerTimeout = {0, 0};
645 pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION};
646 bool hasActiveTimer = false;
647 std::chrono::milliseconds totalWaitTime {0};
Adrien Béraud9a676812023-08-21 09:02:16 -0400648 // auto const start = std::chrono::steady_clock::now();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400649 // We try to process pending events as fast as possible to
650 // speed-up the release.
651 int maxEventToProcess = 10;
652
653 do {
654 if (checkEventQueue(maxEventToProcess) < 0)
655 return -1;
656
657 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout);
658 hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32);
659
660 if (hasActiveTimer) {
661 pj_time_val_normalize(&timerTimeout);
662 auto waitTime = std::chrono::milliseconds(
663 std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime)));
664 std::this_thread::sleep_for(waitTime);
665 totalWaitTime += waitTime;
666 }
667 } while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT));
668
Adrien Béraud9a676812023-08-21 09:02:16 -0400669 // auto duration = std::chrono::steady_clock::now() - start;
670 // if (logger_)
671 // logger_->debug("[ice:{}] Timer heap flushed after {}", fmt::ptr(this), dht::print_duration(duration));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400672
673 return static_cast<int>(pj_timer_heap_count(config_.stun_cfg.timer_heap));
674}
675
676int
677IceTransport::Impl::checkEventQueue(int maxEventToPoll)
678{
679 pj_time_val timeout = {0, 0};
680 int eventCount = 0;
681 int events = 0;
682
683 do {
684 events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
685 if (events < 0) {
686 const auto err = pj_get_os_error();
687 if (logger_)
688 logger_->error("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
689 return events;
690 }
691
692 eventCount += events;
693
694 } while (events > 0 && eventCount < maxEventToPoll);
695
696 return eventCount;
697}
698
699void
700IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status)
701{
702 const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization"
703 : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation"
704 : "unknown_op";
705
706 const bool done = status == PJ_SUCCESS;
707 if (done) {
708 if (logger_)
709 logger_->debug("[ice:{}] {:s} {:s} success",
710 fmt::ptr(this),
711 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
712 opname);
713 } else {
714 if (logger_)
715 logger_->error("[ice:{}] {:s} {:s} failed: {:s}",
716 fmt::ptr(this),
717 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
718 opname,
719 sip_utils::sip_strerror(status));
720 }
721
722 if (done and op == PJ_ICE_STRANS_OP_INIT) {
723 if (initiatorSession_)
724 setInitiatorSession();
725 else
726 setSlaveSession();
727 }
728
729 if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
730 on_initdone_cb_(done);
731 else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) {
732 if (done) {
733 // Dump of connection pairs
734 if (logger_)
735 logger_->debug("[ice:{}] {:s} connection pairs ([comp id] local [type] <-> remote [type]):\n{:s}",
736 fmt::ptr(this),
737 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
738 link());
739 }
740 if (on_negodone_cb_)
741 on_negodone_cb_(done);
742 }
743
744 iceCV_.notify_all();
745}
746
747std::string
748IceTransport::Impl::link() const
749{
750 std::ostringstream out;
751 for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) {
752 auto absIdx = strm;
753 auto comp = (strm + 1) / compCountPerStream_;
754 auto laddr = getLocalAddress(absIdx);
755 auto raddr = getRemoteAddress(absIdx);
756
757 if (laddr and laddr.getPort() != 0 and raddr and raddr.getPort() != 0) {
758 out << " [" << comp << "] " << laddr.toString(true, true) << " ["
759 << getCandidateType(getSelectedCandidate(absIdx, false)) << "] "
760 << " <-> " << raddr.toString(true, true) << " ["
761 << getCandidateType(getSelectedCandidate(absIdx, true)) << "] " << '\n';
762 } else {
763 out << " [" << comp << "] disabled\n";
764 }
765 }
766 return out.str();
767}
768
769bool
770IceTransport::Impl::setInitiatorSession()
771{
772 if (logger_)
773 logger_->debug("[ice:{}] as master", fmt::ptr(this));
774 initiatorSession_ = true;
775 if (_isInitialized()) {
776 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING);
777 if (status != PJ_SUCCESS) {
778 if (logger_)
779 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
780 return false;
781 }
782 return true;
783 }
784 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING);
785}
786
787bool
788IceTransport::Impl::setSlaveSession()
789{
790 if (logger_)
791 logger_->debug("[ice:{}] as slave", fmt::ptr(this));
792 initiatorSession_ = false;
793 if (_isInitialized()) {
794 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED);
795 if (status != PJ_SUCCESS) {
796 if (logger_)
797 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
798 return false;
799 }
800 return true;
801 }
802 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED);
803}
804
805const pj_ice_sess_cand*
806IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const
807{
808 ASSERT_COMP_ID(comp_id, compCount_);
809
810 // Return the selected candidate pair. Might not be the nominated pair if
811 // ICE has not concluded yet, but should be the nominated pair afterwards.
812 if (not _isRunning()) {
813 if (logger_)
814 logger_->error("[ice:{}] ICE transport is not running", fmt::ptr(this));
815 return nullptr;
816 }
817
818 const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id);
819 if (sess == nullptr) {
820 if (logger_)
821 logger_->warn("[ice:{}] Component {} has no valid pair (disabled)", fmt::ptr(this), comp_id);
822 return nullptr;
823 }
824
825 if (remote)
826 return sess->rcand;
827 else
828 return sess->lcand;
829}
830
831IpAddr
832IceTransport::Impl::getLocalAddress(unsigned comp_id) const
833{
834 ASSERT_COMP_ID(comp_id, compCount_);
835
836 if (auto cand = getSelectedCandidate(comp_id, false))
837 return cand->addr;
838
839 return {};
840}
841
842IpAddr
843IceTransport::Impl::getRemoteAddress(unsigned comp_id) const
844{
845 ASSERT_COMP_ID(comp_id, compCount_);
846
847 if (auto cand = getSelectedCandidate(comp_id, true))
848 return cand->addr;
849
850 return {};
851}
852
853const char*
854IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand)
855{
856 auto name = cand ? pj_ice_get_cand_type_name(cand->type) : nullptr;
857 return name ? name : "?";
858}
859
860void
861IceTransport::Impl::getUFragPwd()
862{
863 if (icest_) {
864 pj_str_t local_ufrag, local_pwd;
865
866 pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr);
867 local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
868 local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
869 }
870}
871
872bool
873IceTransport::Impl::createIceSession(pj_ice_sess_role role)
874{
875 if (not icest_) {
876 return false;
877 }
878
879 if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) {
880 if (logger_)
881 logger_->error("[ice:{}] pj_ice_strans_init_ice() failed", fmt::ptr(this));
882 return false;
883 }
884
885 // Fetch some information on local configuration
886 getUFragPwd();
887
888 if (logger_)
Sébastien Blincf569402023-07-27 09:46:40 -0400889 logger_->debug("[ice:{}] (local) ufrag={}, pwd={}", fmt::ptr(this), local_ufrag_, local_pwd_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400890
891 return true;
892}
893
894bool
895IceTransport::Impl::addStunConfig(int af)
896{
897 if (config_.stun_tp_cnt >= PJ_ICE_MAX_STUN) {
898 if (logger_)
Sébastien Blincf569402023-07-27 09:46:40 -0400899 logger_->error("Max number of STUN configurations reached ({})", PJ_ICE_MAX_STUN);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400900 return false;
901 }
902
903 if (af != pj_AF_INET() and af != pj_AF_INET6()) {
904 if (logger_)
Sébastien Blincf569402023-07-27 09:46:40 -0400905 logger_->error("Invalid address familly ({})", af);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400906 return false;
907 }
908
909 auto& stun = config_.stun_tp[config_.stun_tp_cnt++];
910
911 pj_ice_strans_stun_cfg_default(&stun);
912 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
913 stun.af = af;
914 stun.conn_type = config_.stun.conn_type;
915
Adrien Béraud370257c2023-08-15 20:53:09 -0400916 // if (logger_)
917 // logger_->debug("[ice:{}] added host stun config for {:s} transport",
918 // fmt::ptr(this),
919 // config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400920
921 return true;
922}
923
924void
925IceTransport::Impl::requestUpnpMappings()
926{
Adrien Béraud612b55b2023-05-29 10:42:04 -0400927 if (not upnp_)
928 return;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400929 auto transport = isTcpEnabled() ? PJ_CAND_TCP_PASSIVE : PJ_CAND_UDP;
930 auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP;
931
Amna49da80f2024-08-26 16:54:40 -0400932 // Use a different map instead of upnpMappings_ to store pointers to the mappings
933 auto upnpMappings = std::make_shared<std::map<Mapping::key_t, Mapping::sharedPtr_t>>();
934 auto isFailed = std::make_shared<bool>(false);
935
936 std::unique_lock lock(upnpMutex_);
937
Adrien Béraud612b55b2023-05-29 10:42:04 -0400938 // Request upnp mapping for each component.
939 for (unsigned id = 1; id <= compCount_; id++) {
940 // Set port number to 0 to get any available port.
941 Mapping requestedMap(portType);
942
Amna49da80f2024-08-26 16:54:40 -0400943 requestedMap.setNotifyCallback([upnpMappings, isFailed, this](Mapping::sharedPtr_t mapPtr) {
944 // Ignore intermidiate states : PENDING, IN_PROGRESS
945 // only OPEN and FAILED are considered
Adrien Béraud612b55b2023-05-29 10:42:04 -0400946
Amna49da80f2024-08-26 16:54:40 -0400947 // if the mapping is open check the validity
948 if ((mapPtr->getState() == MappingState::OPEN)) {
949 if (mapPtr->getMapKey() and mapPtr->hasValidHostAddress()){
950 std::lock_guard lockMapping(upnpMappingsMutex_);
951 upnpMappings->emplace(mapPtr->getMapKey(), mapPtr);
952 } else {
953 *isFailed = true;
954 }
955 } else if (mapPtr->getState() == MappingState::FAILED) {
956 *isFailed = true;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400957 if (logger_)
Amna49da80f2024-08-26 16:54:40 -0400958 logger_->error("[ice:{}] UPNP mapping failed: {:s}",
959 fmt::ptr(this),
960 mapPtr->toString(true));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400961 }
Amna49da80f2024-08-26 16:54:40 -0400962 upnpCv_.notify_all();
963 });
964 // Request the mapping
965 upnp_->reserveMapping(requestedMap);
966 }
967 upnpCv_.wait_for(lock, PORT_MAPPING_TIMEOUT, [&] {
968 return upnpMappings->size() == compCount_ or *isFailed;
969 });
970
971 std::lock_guard lockMapping(upnpMappingsMutex_);
972
973 // remove the notify callback
974 for (auto& map : *upnpMappings) {
975 map.second->setNotifyCallback(nullptr);
976 }
977 // Check the number of mappings
978 if (upnpMappings->size() != compCount_) {
979 if (logger_)
980 logger_->error("[ice:{}] UPNP mapping failed: expected {:d} mappings, got {:d}",
981 fmt::ptr(this),
982 compCount_,
983 upnpMappings->size());
984 // release all mappings
985 for (auto& map : *upnpMappings) {
986 upnp_->releaseMapping(*map.second);
987 }
988 } else {
989 for (auto& map : *upnpMappings) {
990 upnpMappings_.emplace(map.first, *map.second);
991 if(logger_)
992 logger_->debug("[ice:{}] UPNP mapping {:s} successfully allocated\n",
993 fmt::ptr(this),
994 map.second->toString(true));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400995 }
996 }
997}
998
999bool
1000IceTransport::Impl::hasUpnp() const
1001{
1002 return upnp_ and upnpMappings_.size() == compCount_;
1003}
1004
1005void
1006IceTransport::Impl::addServerReflexiveCandidates(
1007 const std::vector<std::pair<IpAddr, IpAddr>>& addrList)
1008{
1009 if (addrList.size() != compCount_) {
1010 if (logger_)
1011 logger_->warn("[ice:{}] Provided addr list size {} does not match component count {}",
1012 fmt::ptr(this),
1013 addrList.size(),
1014 compCount_);
1015 return;
1016 }
1017 if (compCount_ > PJ_ICE_MAX_COMP) {
1018 if (logger_)
1019 logger_->error("[ice:{}] Too many components", fmt::ptr(this));
1020 return;
1021 }
1022
1023 // Add config for server reflexive candidates (UPNP or from DHT).
1024 if (not addStunConfig(pj_AF_INET()))
1025 return;
1026
1027 assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN);
1028 auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1];
1029
1030 for (unsigned id = 1; id <= compCount_; id++) {
1031 auto idx = id - 1;
1032 auto& localAddr = addrList[idx].first;
1033 auto& publicAddr = addrList[idx].second;
1034
1035 if (logger_)
1036 logger_->debug("[ice:{}] Add srflx reflexive candidates [{:s} : {:s}] for comp {:d}",
1037 fmt::ptr(this),
1038 localAddr.toString(true),
1039 publicAddr.toString(true),
1040 id);
1041
1042 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr());
1043 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr());
1044
1045 if (isTcpEnabled()) {
1046 if (publicAddr.getPort() == 9) {
1047 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE;
1048 } else {
1049 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE;
1050 }
1051 } else {
1052 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP;
1053 }
1054 }
1055
1056 stun.cfg.user_mapping_cnt = compCount_;
1057}
1058
1059std::vector<std::pair<IpAddr, IpAddr>>
1060IceTransport::Impl::setupGenericReflexiveCandidates()
1061{
1062 if (not accountLocalAddr_) {
1063 if (logger_)
1064 logger_->warn("[ice:{}] Missing local address, generic srflx candidates wont be generated!",
1065 fmt::ptr(this));
1066 return {};
1067 }
1068
1069 if (not accountPublicAddr_) {
1070 if (logger_)
1071 logger_->warn("[ice:{}] Missing public address, generic srflx candidates wont be generated!",
1072 fmt::ptr(this));
1073 return {};
1074 }
1075
1076 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1077 auto isTcp = isTcpEnabled();
1078
1079 addrList.reserve(compCount_);
1080 for (unsigned id = 1; id <= compCount_; id++) {
1081 // For TCP, the type is set to active, because most likely the incoming
1082 // connection will be blocked by the NAT.
1083 // For UDP use random port number.
1084 uint16_t port = isTcp ? 9
1085 : upnp::Controller::generateRandomPort(isTcp ? PortType::TCP
1086 : PortType::UDP);
1087
1088 accountLocalAddr_.setPort(port);
1089 accountPublicAddr_.setPort(port);
1090 addrList.emplace_back(accountLocalAddr_, accountPublicAddr_);
1091 }
1092
1093 return addrList;
1094}
1095
1096std::vector<std::pair<IpAddr, IpAddr>>
1097IceTransport::Impl::setupUpnpReflexiveCandidates()
1098{
1099 // Add UPNP server reflexive candidates if available.
1100 if (not hasUpnp())
1101 return {};
1102
Adrien Béraud024c46f2024-03-02 23:53:18 -05001103 std::lock_guard lock(upnpMappingsMutex_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001104
1105 if (upnpMappings_.size() < (size_t)compCount_) {
1106 if (logger_)
1107 logger_->warn("[ice:{}] Not enough mappings {:d}. Expected {:d}",
1108 fmt::ptr(this),
1109 upnpMappings_.size(),
1110 compCount_);
1111 return {};
1112 }
1113
1114 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1115
1116 addrList.reserve(upnpMappings_.size());
1117 for (auto const& [_, map] : upnpMappings_) {
1118 assert(map.getMapKey());
1119 IpAddr localAddr {map.getInternalAddress()};
1120 localAddr.setPort(map.getInternalPort());
1121 IpAddr publicAddr {map.getExternalAddress()};
1122 publicAddr.setPort(map.getExternalPort());
1123 addrList.emplace_back(localAddr, publicAddr);
1124 }
1125
1126 return addrList;
1127}
1128
1129void
1130IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr)
1131{
1132 ASSERT_COMP_ID(compId, compCount_);
1133
1134 iceDefaultRemoteAddr_[compId - 1] = addr;
1135 // The port does not matter. Set it 0 to avoid confusion.
1136 iceDefaultRemoteAddr_[compId - 1].setPort(0);
1137}
1138
1139IpAddr
1140IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const
1141{
Adrien Béraud0e1bffa2023-10-15 17:32:55 -04001142 if (compId > compCount_) {
1143 if (logger_)
1144 logger_->error("[ice:{}] Invalid component id {:d}", fmt::ptr(this), compId);
1145 return {};
1146 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001147 return iceDefaultRemoteAddr_[compId - 1];
1148}
1149
1150void
1151IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size)
1152{
1153 ASSERT_COMP_ID(comp_id, compCount_);
1154
1155 jami_tracepoint_if_enabled(ice_transport_recv,
1156 reinterpret_cast<uint64_t>(this),
1157 comp_id,
1158 size,
1159 getRemoteAddress(comp_id).toString().c_str());
1160 if (size == 0)
1161 return;
1162
1163 {
1164 auto& io = compIO_[comp_id - 1];
Adrien Béraud024c46f2024-03-02 23:53:18 -05001165 std::lock_guard lk(io.mutex);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001166
1167 if (io.recvCb) {
1168 io.recvCb((uint8_t*) pkt, size);
1169 return;
1170 }
1171 }
1172
1173 std::error_code ec;
1174 auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec);
1175 if (err < 0) {
1176 if (logger_)
1177 logger_->error("[ice:{}] rx: channel is closed", fmt::ptr(this));
1178 }
1179}
1180
1181bool
1182IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout)
1183{
1184 IceLock lk(icest_);
1185
1186 if (not iceCV_.wait_for(lk, timeout, [this] {
1187 return threadTerminateFlags_ or _isInitialized() or _isFailed();
1188 })) {
1189 if (logger_)
1190 logger_->warn("[ice:{}] waitForInitialization: timeout", fmt::ptr(this));
1191 return false;
1192 }
1193
1194 return _isInitialized();
1195}
1196
1197//==============================================================================
1198
Adrien Béraud89933c12023-07-26 14:53:30 -04001199IceTransport::IceTransport(std::string_view name, const std::shared_ptr<dht::log::Logger>& logger)
1200 : pimpl_ {std::make_unique<Impl>(name, logger)}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001201{}
1202
1203IceTransport::~IceTransport()
1204{
1205 cancelOperations();
1206}
1207
1208const std::shared_ptr<dht::log::Logger>&
1209IceTransport::logger() const
1210{
1211 return pimpl_->logger_;
1212}
1213
1214void
1215IceTransport::initIceInstance(const IceTransportOptions& options)
1216{
1217 pimpl_->initIceInstance(options);
1218 jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this));
1219}
1220
1221bool
1222IceTransport::isInitialized() const
1223{
1224 IceLock lk(pimpl_->icest_);
1225 return pimpl_->_isInitialized();
1226}
1227
1228bool
1229IceTransport::isStarted() const
1230{
1231 IceLock lk(pimpl_->icest_);
1232 return pimpl_->_isStarted();
1233}
1234
1235bool
1236IceTransport::isRunning() const
1237{
1238 if (!pimpl_->icest_)
1239 return false;
1240 IceLock lk(pimpl_->icest_);
1241 return pimpl_->_isRunning();
1242}
1243
1244bool
1245IceTransport::isFailed() const
1246{
1247 return pimpl_->_isFailed();
1248}
1249
1250unsigned
1251IceTransport::getComponentCount() const
1252{
1253 return pimpl_->compCount_;
1254}
1255
1256bool
1257IceTransport::setSlaveSession()
1258{
1259 return pimpl_->setSlaveSession();
1260}
1261bool
1262IceTransport::setInitiatorSession()
1263{
1264 return pimpl_->setInitiatorSession();
1265}
1266
1267bool
1268IceTransport::isInitiator() const
1269{
1270 if (isInitialized()) {
1271 return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING;
1272 }
1273 return pimpl_->initiatorSession_;
1274}
1275
1276bool
1277IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& rem_candidates)
1278{
1279 if (not isInitialized()) {
1280 if (pimpl_->logger_)
1281 pimpl_->logger_->error("[ice:{}] not initialized transport", fmt::ptr(pimpl_.get()));
1282 pimpl_->is_stopped_ = true;
1283 return false;
1284 }
1285
1286 // pj_ice_strans_start_ice crashes if remote candidates array is empty
1287 if (rem_candidates.empty()) {
1288 if (pimpl_->logger_)
1289 pimpl_->logger_->error("[ice:{}] start failed: no remote candidates", fmt::ptr(pimpl_.get()));
1290 pimpl_->is_stopped_ = true;
1291 return false;
1292 }
1293
1294 auto comp_cnt = std::max(1u, getComponentCount());
1295 if (rem_candidates.size() / comp_cnt > PJ_ICE_ST_MAX_CAND - 1) {
1296 std::vector<IceCandidate> rcands;
1297 rcands.reserve(PJ_ICE_ST_MAX_CAND - 1);
1298 if (pimpl_->logger_)
1299 pimpl_->logger_->warn("[ice:{}] too much candidates detected, trim list.", fmt::ptr(pimpl_.get()));
1300 // Just trim some candidates. To avoid to only take host candidates, iterate
1301 // through the whole list and select some host, some turn and peer reflexives
1302 // It should give at least enough infos to negotiate.
1303 auto maxHosts = 8;
1304 auto maxRelays = PJ_ICE_MAX_TURN;
1305 for (auto& c : rem_candidates) {
1306 if (c.type == PJ_ICE_CAND_TYPE_HOST) {
1307 if (maxHosts == 0)
1308 continue;
1309 maxHosts -= 1;
1310 } else if (c.type == PJ_ICE_CAND_TYPE_RELAYED) {
1311 if (maxRelays == 0)
1312 continue;
1313 maxRelays -= 1;
1314 }
1315 if (rcands.size() == PJ_ICE_ST_MAX_CAND - 1)
1316 break;
1317 rcands.emplace_back(std::move(c));
1318 }
1319 rem_candidates = std::move(rcands);
1320 }
1321
1322 pj_str_t ufrag, pwd;
1323 if (pimpl_->logger_)
1324 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:d} remote candidates)",
1325 fmt::ptr(pimpl_),
1326 rem_candidates.size());
1327
1328 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1329 pj_strset(&ufrag,
1330 (char*) rem_attrs.ufrag.c_str(),
1331 rem_attrs.ufrag.size()),
1332 pj_strset(&pwd,
1333 (char*) rem_attrs.pwd.c_str(),
1334 rem_attrs.pwd.size()),
1335 rem_candidates.size(),
1336 rem_candidates.data());
1337 if (status != PJ_SUCCESS) {
1338 if (pimpl_->logger_)
1339 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_.get()), sip_utils::sip_strerror(status));
1340 pimpl_->is_stopped_ = true;
1341 return false;
1342 }
1343
1344 return true;
1345}
1346
1347bool
1348IceTransport::startIce(const SDP& sdp)
1349{
1350 if (pimpl_->streamsCount_ != 1) {
1351 if (pimpl_->logger_)
Adrien Béraud49b07192023-06-13 20:03:33 -04001352 pimpl_->logger_->error(FMT_STRING("Expected exactly one stream per SDP (found {:d} streams)"), pimpl_->streamsCount_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001353 return false;
1354 }
1355
1356 if (not isInitialized()) {
1357 if (pimpl_->logger_)
1358 pimpl_->logger_->error(FMT_STRING("[ice:{}] not initialized transport"), fmt::ptr(pimpl_));
1359 pimpl_->is_stopped_ = true;
1360 return false;
1361 }
1362
1363 for (unsigned id = 1; id <= getComponentCount(); id++) {
1364 auto candVec = getLocalCandidates(id);
1365 for (auto const& cand : candVec) {
1366 if (pimpl_->logger_)
1367 pimpl_->logger_->debug("[ice:{}] Using local candidate {:s} for comp {:d}",
1368 fmt::ptr(pimpl_), cand, id);
1369 }
1370 }
1371
1372 if (pimpl_->logger_)
1373 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:u} remote candidates)",
1374 fmt::ptr(pimpl_), sdp.candidates.size());
1375 pj_str_t ufrag, pwd;
1376
1377 std::vector<IceCandidate> rem_candidates;
1378 rem_candidates.reserve(sdp.candidates.size());
1379 IceCandidate cand;
1380 for (const auto& line : sdp.candidates) {
1381 if (parseIceAttributeLine(0, line, cand))
1382 rem_candidates.emplace_back(cand);
1383 }
1384
1385 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1386 pj_strset(&ufrag,
1387 (char*) sdp.ufrag.c_str(),
1388 sdp.ufrag.size()),
1389 pj_strset(&pwd, (char*) sdp.pwd.c_str(), sdp.pwd.size()),
1390 rem_candidates.size(),
1391 rem_candidates.data());
1392 if (status != PJ_SUCCESS) {
1393 if (pimpl_->logger_)
1394 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1395 pimpl_->is_stopped_ = true;
1396 return false;
1397 }
1398
1399 return true;
1400}
1401
1402void
1403IceTransport::cancelOperations()
1404{
1405 pimpl_->cancelOperations();
1406}
1407
1408IpAddr
1409IceTransport::getLocalAddress(unsigned comp_id) const
1410{
1411 return pimpl_->getLocalAddress(comp_id);
1412}
1413
1414IpAddr
1415IceTransport::getRemoteAddress(unsigned comp_id) const
1416{
1417 // Return the default remote address if set.
1418 // Note that the default remote addresses are the addresses
1419 // set in the 'c=' and 'a=rtcp' lines of the received SDP.
1420 // See pj_ice_strans_sendto2() for more details.
1421 if (auto defaultAddr = pimpl_->getDefaultRemoteAddress(comp_id)) {
1422 return defaultAddr;
1423 }
1424
1425 return pimpl_->getRemoteAddress(comp_id);
1426}
1427
1428const IceTransport::Attribute
1429IceTransport::getLocalAttributes() const
1430{
1431 return {pimpl_->local_ufrag_, pimpl_->local_pwd_};
1432}
1433
1434std::vector<std::string>
1435IceTransport::getLocalCandidates(unsigned comp_id) const
1436{
1437 ASSERT_COMP_ID(comp_id, getComponentCount());
1438 std::vector<std::string> res;
1439 pj_ice_sess_cand cand[MAX_CANDIDATES];
1440 unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
1441
1442 if (!isInitialized()) {
1443 return res;
1444 }
1445
1446 if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
1447 if (pimpl_->logger_)
1448 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1449 return res;
1450 }
1451
1452 res.reserve(cand_cnt);
1453 for (unsigned i = 0; i < cand_cnt; ++i) {
1454 /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544)
1455 * candidate-attribute = "candidate" ":" foundation SP component-id
1456 * SP "TCP" SP priority SP connection-address SP port SP cand-type [SP
1457 * rel-addr] [SP rel-port] SP tcp-type-ext
1458 * *(SP extension-att-name SP
1459 * extension-att-value)
1460 *
1461 * tcp-type-ext = "tcptype" SP tcp-type
1462 * tcp-type = "active" / "passive" / "so"
1463 */
1464 char ipaddr[PJ_INET6_ADDRSTRLEN];
1465 std::string tcp_type;
1466 if (cand[i].transport != PJ_CAND_UDP) {
1467 tcp_type += " tcptype";
1468 switch (cand[i].transport) {
1469 case PJ_CAND_TCP_ACTIVE:
1470 tcp_type += " active";
1471 break;
1472 case PJ_CAND_TCP_PASSIVE:
1473 tcp_type += " passive";
1474 break;
1475 case PJ_CAND_TCP_SO:
1476 default:
1477 tcp_type += " so";
1478 break;
1479 }
1480 }
1481 res.emplace_back(
1482 fmt::format("{} {} {} {} {} {} typ {}{}",
1483 sip_utils::as_view(cand[i].foundation),
1484 cand[i].comp_id,
1485 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1486 cand[i].prio,
1487 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1488 pj_sockaddr_get_port(&cand[i].addr),
1489 pj_ice_get_cand_type_name(cand[i].type),
1490 tcp_type));
1491 }
1492
1493 return res;
1494}
1495std::vector<std::string>
1496IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
1497{
1498 ASSERT_COMP_ID(compId, getComponentCount());
1499
1500 std::vector<std::string> res;
1501 pj_ice_sess_cand cand[MAX_CANDIDATES];
1502 unsigned cand_cnt = MAX_CANDIDATES;
1503
1504 if (not isInitialized()) {
1505 return res;
1506 }
1507
1508 // In the implementation, the component IDs are enumerated globally
1509 // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create
1510 // only one pj_ice_strans instance. However, the component IDs are
1511 // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in
1512 // order to be compliant with the spec.
1513
1514 auto globalCompId = streamIdx * 2 + compId;
1515 if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) {
1516 if (pimpl_->logger_)
1517 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1518 return res;
1519 }
1520
1521 res.reserve(cand_cnt);
1522 // Build ICE attributes according to RFC 6544, section 4.5.
1523 for (unsigned i = 0; i < cand_cnt; ++i) {
1524 char ipaddr[PJ_INET6_ADDRSTRLEN];
1525 std::string tcp_type;
1526 if (cand[i].transport != PJ_CAND_UDP) {
1527 tcp_type += " tcptype";
1528 switch (cand[i].transport) {
1529 case PJ_CAND_TCP_ACTIVE:
1530 tcp_type += " active";
1531 break;
1532 case PJ_CAND_TCP_PASSIVE:
1533 tcp_type += " passive";
1534 break;
1535 case PJ_CAND_TCP_SO:
1536 default:
1537 tcp_type += " so";
1538 break;
1539 }
1540 }
1541 res.emplace_back(
1542 fmt::format("{} {} {} {} {} {} typ {}{}",
1543 sip_utils::as_view(cand[i].foundation),
1544 compId,
1545 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1546 cand[i].prio,
1547 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1548 pj_sockaddr_get_port(&cand[i].addr),
1549 pj_ice_get_cand_type_name(cand[i].type),
1550 tcp_type));
1551 }
1552
1553 return res;
1554}
1555
1556bool
1557IceTransport::parseIceAttributeLine(unsigned streamIdx,
1558 const std::string& line,
1559 IceCandidate& cand) const
1560{
1561 // Silently ignore empty lines
1562 if (line.empty())
1563 return false;
1564
1565 if (streamIdx >= pimpl_->streamsCount_) {
1566 throw std::runtime_error(fmt::format("Stream index {:d} is invalid!", streamIdx));
1567 }
1568
1569 int af, cnt;
1570 char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32];
1571 pj_str_t tmpaddr;
1572 unsigned comp_id, prio, port;
1573 pj_status_t status;
1574 pj_bool_t is_tcp = PJ_FALSE;
1575
1576 // Parse ICE attribute line according to RFC-6544 section 4.5.
1577 // TODO/WARNING: There is no fail-safe in case of malformed attributes.
1578 cnt = sscanf(line.c_str(),
1579 "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n",
1580 foundation,
1581 &comp_id,
1582 transport,
1583 &prio,
1584 ipaddr,
1585 &port,
1586 type,
1587 tcp_type);
1588 if (cnt != 7 && cnt != 8) {
1589 if (pimpl_->logger_)
1590 pimpl_->logger_->error("[ice:{}] Invalid ICE candidate line: {:s}", fmt::ptr(pimpl_), line);
1591 return false;
1592 }
1593
1594 if (strcmp(transport, "TCP") == 0) {
1595 is_tcp = PJ_TRUE;
1596 }
1597
1598 pj_bzero(&cand, sizeof(IceCandidate));
1599
1600 if (strcmp(type, "host") == 0)
1601 cand.type = PJ_ICE_CAND_TYPE_HOST;
1602 else if (strcmp(type, "srflx") == 0)
1603 cand.type = PJ_ICE_CAND_TYPE_SRFLX;
1604 else if (strcmp(type, "prflx") == 0)
1605 cand.type = PJ_ICE_CAND_TYPE_PRFLX;
1606 else if (strcmp(type, "relay") == 0)
1607 cand.type = PJ_ICE_CAND_TYPE_RELAYED;
1608 else {
1609 if (pimpl_->logger_)
1610 pimpl_->logger_->warn("[ice:{}] invalid remote candidate type '{:s}'", fmt::ptr(pimpl_), type);
1611 return false;
1612 }
1613
1614 if (is_tcp) {
1615 if (strcmp(tcp_type, "active") == 0)
1616 cand.transport = PJ_CAND_TCP_ACTIVE;
1617 else if (strcmp(tcp_type, "passive") == 0)
1618 cand.transport = PJ_CAND_TCP_PASSIVE;
1619 else if (strcmp(tcp_type, "so") == 0)
1620 cand.transport = PJ_CAND_TCP_SO;
1621 else {
1622 if (pimpl_->logger_)
1623 pimpl_->logger_->warn("[ice:{}] invalid transport type type '{:s}'", fmt::ptr(pimpl_), tcp_type);
1624 return false;
1625 }
1626 } else {
1627 cand.transport = PJ_CAND_UDP;
1628 }
1629
1630 // If the component Id is enumerated relative to media, convert
1631 // it to absolute enumeration.
1632 if (comp_id <= pimpl_->compCountPerStream_) {
1633 comp_id += pimpl_->compCountPerStream_ * streamIdx;
1634 }
1635 cand.comp_id = (pj_uint8_t) comp_id;
1636
1637 cand.prio = prio;
1638
1639 if (strchr(ipaddr, ':'))
1640 af = pj_AF_INET6();
1641 else {
1642 af = pj_AF_INET();
1643 pimpl_->onlyIPv4Private_ &= IpAddr(ipaddr).isPrivate();
1644 }
1645
1646 tmpaddr = pj_str(ipaddr);
1647 pj_sockaddr_init(af, &cand.addr, NULL, 0);
1648 status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr);
1649 if (status != PJ_SUCCESS) {
1650 if (pimpl_->logger_)
1651 pimpl_->logger_->warn("[ice:{}] invalid IP address '{:s}'", fmt::ptr(pimpl_), ipaddr);
1652 return false;
1653 }
1654
1655 pj_sockaddr_set_port(&cand.addr, (pj_uint16_t) port);
1656 pj_strdup2(pimpl_->pool_.get(), &cand.foundation, foundation);
1657
1658 return true;
1659}
1660
1661ssize_t
1662IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec)
1663{
1664 ASSERT_COMP_ID(compId, getComponentCount());
1665 auto& io = pimpl_->compIO_[compId - 1];
Adrien Béraud024c46f2024-03-02 23:53:18 -05001666 std::lock_guard lk(io.mutex);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001667
1668 if (io.queue.empty()) {
1669 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
1670 return -1;
1671 }
1672
1673 auto& packet = io.queue.front();
Adrien Béraud21d5f462023-08-27 12:06:21 -04001674 const auto count = std::min(len, packet.size());
1675 std::copy_n(packet.begin(), count, buf);
1676 if (count == packet.size()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001677 io.queue.pop_front();
1678 } else {
Adrien Béraud21d5f462023-08-27 12:06:21 -04001679 packet.erase(packet.begin(), packet.begin() + count);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001680 }
1681
1682 ec.clear();
1683 return count;
1684}
1685
1686ssize_t
1687IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec)
1688{
1689 ASSERT_COMP_ID(compId, getComponentCount());
1690 return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec);
1691}
1692
1693void
1694IceTransport::setOnRecv(unsigned compId, IceRecvCb cb)
1695{
1696 ASSERT_COMP_ID(compId, getComponentCount());
1697
1698 auto& io = pimpl_->compIO_[compId - 1];
Adrien Béraud024c46f2024-03-02 23:53:18 -05001699 std::lock_guard lk(io.mutex);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001700 io.recvCb = std::move(cb);
1701
1702 if (io.recvCb) {
1703 // Flush existing queue using the callback
1704 for (const auto& packet : io.queue)
Adrien Béraud21d5f462023-08-27 12:06:21 -04001705 io.recvCb((uint8_t*) packet.data(), packet.size());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001706 io.queue.clear();
1707 }
1708}
1709
1710void
1711IceTransport::setOnShutdown(onShutdownCb&& cb)
1712{
Adrien Béraudef3e9fd2023-10-01 21:15:15 -04001713 pimpl_->scb = std::move(cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001714}
1715
1716ssize_t
1717IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
1718{
1719 ASSERT_COMP_ID(compId, getComponentCount());
1720
1721 auto remote = getRemoteAddress(compId);
1722
1723 if (!remote) {
1724 if (pimpl_->logger_)
1725 pimpl_->logger_->error("[ice:{}] can't find remote address for component {:d}", fmt::ptr(pimpl_), compId);
1726 errno = EINVAL;
1727 return -1;
1728 }
1729
1730 std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock);
1731 if (isTCPEnabled())
1732 dlk.lock();
1733
1734 jami_tracepoint(ice_transport_send,
1735 reinterpret_cast<uint64_t>(this),
1736 compId,
1737 len,
1738 remote.toString().c_str());
1739
1740 auto status = pj_ice_strans_sendto2(pimpl_->icest_,
1741 compId,
1742 buf,
1743 len,
1744 remote.pjPtr(),
1745 remote.getLength());
1746
1747 jami_tracepoint(ice_transport_send_status, status);
1748
1749 if (status == PJ_EPENDING && isTCPEnabled()) {
1750 // NOTE; because we are in TCP, the sent size will count the header (2
1751 // bytes length).
1752 pimpl_->waitDataCv_.wait(dlk, [&] {
1753 return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_;
1754 });
1755 pimpl_->lastSentLen_ = 0;
1756 } else if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1757 if (status == PJ_EBUSY) {
1758 errno = EAGAIN;
1759 } else {
1760 if (pimpl_->logger_)
1761 pimpl_->logger_->error("[ice:{}] ice send failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1762 errno = EIO;
1763 }
1764 return -1;
1765 }
1766
1767 return len;
1768}
1769
1770bool
1771IceTransport::waitForInitialization(std::chrono::milliseconds timeout)
1772{
1773 return pimpl_->_waitForInitialization(timeout);
1774}
1775
1776ssize_t
1777IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec)
1778{
1779 ASSERT_COMP_ID(compId, getComponentCount());
1780 return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec);
1781}
1782
1783bool
Adrien Béraud02d6dad2023-11-06 09:26:51 -05001784IceTransport::isTCPEnabled() const
Adrien Béraud612b55b2023-05-29 10:42:04 -04001785{
1786 return pimpl_->isTcpEnabled();
1787}
1788
1789ICESDP
1790IceTransport::parseIceCandidates(std::string_view sdp_msg)
1791{
1792 if (pimpl_->streamsCount_ != 1) {
1793 if (pimpl_->logger_)
Sébastien Blincf569402023-07-27 09:46:40 -04001794 pimpl_->logger_->error("Expected exactly one stream per SDP (found {} streams)", pimpl_->streamsCount_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001795 return {};
1796 }
1797
1798 ICESDP res;
1799 int nr = 0;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -04001800 for (std::string_view line; dhtnet::getline(sdp_msg, line); nr++) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001801 if (nr == 0) {
1802 res.rem_ufrag = line;
1803 } else if (nr == 1) {
1804 res.rem_pwd = line;
1805 } else {
1806 IceCandidate cand;
1807 if (parseIceAttributeLine(0, std::string(line), cand)) {
1808 if (pimpl_->logger_)
1809 pimpl_->logger_->debug("[ice:{}] Add remote candidate: {}",
1810 fmt::ptr(pimpl_),
1811 line);
1812 res.rem_candidates.emplace_back(cand);
1813 }
1814 }
1815 }
1816 return res;
1817}
1818
1819void
1820IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr)
1821{
1822 pimpl_->setDefaultRemoteAddress(comp_id, addr);
1823}
1824
1825std::string
1826IceTransport::link() const
1827{
1828 return pimpl_->link();
1829}
1830
1831//==============================================================================
1832
Adrien Béraud89933c12023-07-26 14:53:30 -04001833IceTransportFactory::IceTransportFactory(const std::shared_ptr<Logger>& logger)
François-Simon Fauteux-Chapleau2ef5b662024-03-27 12:30:26 -04001834 : pjInitLock_()
1835 // Warning: pj_caching_pool_destroy will segfault if it's called before
1836 // pj_caching_pool_init. Hence, any member which appears in the initializer
1837 // list and whose constructor can fail (such as pjInitLock_) must be constructed
1838 // before cp_ (which means it must be declared before cp_ in the class definition).
1839 , cp_(new pj_caching_pool(),
Adrien Béraud612b55b2023-05-29 10:42:04 -04001840 [](pj_caching_pool* p) {
1841 pj_caching_pool_destroy(p);
1842 delete p;
1843 })
1844 , ice_cfg_()
Adrien Béraud89933c12023-07-26 14:53:30 -04001845 , logger_(logger)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001846{
1847 pj_caching_pool_init(cp_.get(), NULL, 0);
1848
1849 pj_ice_strans_cfg_default(&ice_cfg_);
1850 ice_cfg_.stun_cfg.pf = &cp_->factory;
1851
1852 // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires
1853 // a minimum of 500ms on fixed-line links. Our usual case is wireless links.
1854 // This solves too long ICE exchange by DHT.
1855 // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout.
1856 ice_cfg_.stun_cfg.rto_msec = 500;
1857
1858 // See https://tools.ietf.org/html/rfc5245#section-8.1.1.2
1859 // If enabled, it may help speed-up the connectivity, but may cause
1860 // the nomination of sub-optimal pairs.
1861 ice_cfg_.opt.aggressive = PJ_FALSE;
1862}
1863
1864IceTransportFactory::~IceTransportFactory() {}
1865
1866std::shared_ptr<IceTransport>
1867IceTransportFactory::createTransport(std::string_view name)
1868{
Adrien Béraud9cf66f12023-10-01 21:15:15 -04001869 return std::make_shared<IceTransport>(name, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001870}
1871
1872std::unique_ptr<IceTransport>
1873IceTransportFactory::createUTransport(std::string_view name)
1874{
Adrien Béraud9cf66f12023-10-01 21:15:15 -04001875 return std::make_unique<IceTransport>(name, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001876}
1877
1878//==============================================================================
1879
1880void
1881IceSocket::close()
1882{
1883 if (ice_transport_)
1884 ice_transport_->setOnRecv(compId_, {});
1885 ice_transport_.reset();
1886}
1887
1888ssize_t
1889IceSocket::send(const unsigned char* buf, size_t len)
1890{
1891 if (not ice_transport_)
1892 return -1;
1893 return ice_transport_->send(compId_, buf, len);
1894}
1895
1896ssize_t
1897IceSocket::waitForData(std::chrono::milliseconds timeout)
1898{
1899 if (not ice_transport_)
1900 return -1;
1901
1902 std::error_code ec;
1903 return ice_transport_->waitForData(compId_, timeout, ec);
1904}
1905
1906void
1907IceSocket::setOnRecv(IceRecvCb cb)
1908{
1909 if (ice_transport_)
1910 ice_transport_->setOnRecv(compId_, cb);
1911}
1912
1913uint16_t
1914IceSocket::getTransportOverhead()
1915{
1916 if (not ice_transport_)
1917 return 0;
1918
1919 return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE
1920 : IPV6_HEADER_SIZE;
1921}
1922
1923void
1924IceSocket::setDefaultRemoteAddress(const IpAddr& addr)
1925{
1926 if (ice_transport_)
1927 ice_transport_->setDefaultRemoteAddress(compId_, addr);
1928}
1929
Sébastien Blin464bdff2023-07-19 08:02:53 -04001930} // namespace dhtnet