blob: 58e722de4de93f8f91750f6361e8724301f38ee2 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
3 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
Adrien Béraudcb753622023-07-17 22:32:49 -040015 * along with this program. If not, see <https://www.gnu.org/licenses/>.
Adrien Béraud612b55b2023-05-29 10:42:04 -040016 */
Adrien Béraud612b55b2023-05-29 10:42:04 -040017#include "ice_transport.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040018#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040019#include "ice_socket.h"
20#include "sip_utils.h"
21#include "string_utils.h"
22#include "upnp/upnp_control.h"
23#include "transport/peer_channel.h"
24#include "tracepoint/tracepoint.h"
25
26#include <opendht/logger.h>
27#include <opendht/utils.h>
28
29#include <pjlib.h>
30
31#include <map>
32#include <atomic>
33#include <queue>
34#include <mutex>
35#include <condition_variable>
36#include <thread>
37#include <utility>
38#include <tuple>
39#include <algorithm>
40#include <sstream>
41#include <chrono>
Adrien Béraud67ff0772023-07-12 11:22:29 -040042#include <memory>
Adrien Béraud612b55b2023-05-29 10:42:04 -040043#include <cerrno>
44
45#include "pj/limits.h"
46
47#define TRY(ret) \
48 do { \
49 if ((ret) != PJ_SUCCESS) \
50 throw std::runtime_error(#ret " failed"); \
51 } while (0)
52
53// Validate that the component ID is within the expected range
54#define ASSERT_COMP_ID(compId, compCount) \
55 do { \
56 if ((compId) == 0 or (compId) > (compCount)) \
57 throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \
58 } while (0)
59
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040060namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040061
62static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
63static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header
64static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header
65static constexpr int MAX_CANDIDATES {32};
66static constexpr int MAX_DESTRUCTION_TIMEOUT {3000};
67static constexpr int HANDLE_EVENT_DURATION {500};
68
69//==============================================================================
70
71using MutexGuard = std::lock_guard<std::mutex>;
72using MutexLock = std::unique_lock<std::mutex>;
73using namespace upnp;
74
75//==============================================================================
76
77class IceLock
78{
79 pj_grp_lock_t* lk_;
80
81public:
82 IceLock(pj_ice_strans* strans)
83 : lk_(pj_ice_strans_get_grp_lock(strans))
84 {
85 lock();
86 }
87
88 ~IceLock() { unlock(); }
89
90 void lock() { if (lk_) pj_grp_lock_acquire(lk_); }
91
92 void unlock() { if (lk_) pj_grp_lock_release(lk_); }
93};
94
95class IceTransport::Impl
96{
97public:
Adrien Béraud89933c12023-07-26 14:53:30 -040098 Impl(std::string_view name, const std::shared_ptr<Logger>& logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -040099 ~Impl();
100
101 void initIceInstance(const IceTransportOptions& options);
102
103 void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status);
104
105 void onReceiveData(unsigned comp_id, void* pkt, pj_size_t size);
106
107 /**
108 * Set/change transport role as initiator.
109 * Should be called before start method.
110 */
111 bool setInitiatorSession();
112
113 /**
114 * Set/change transport role as slave.
115 * Should be called before start method.
116 */
117 bool setSlaveSession();
118 bool createIceSession(pj_ice_sess_role role);
119
120 void getUFragPwd();
121
122 std::string link() const;
123
124 bool _isInitialized() const;
125 bool _isStarted() const;
126 bool _isRunning() const;
127 bool _isFailed() const;
128 bool _waitForInitialization(std::chrono::milliseconds timeout);
129
130 const pj_ice_sess_cand* getSelectedCandidate(unsigned comp_id, bool remote) const;
131 IpAddr getLocalAddress(unsigned comp_id) const;
132 IpAddr getRemoteAddress(unsigned comp_id) const;
133 static const char* getCandidateType(const pj_ice_sess_cand* cand);
134 bool isTcpEnabled() const { return config_.protocol == PJ_ICE_TP_TCP; }
135 bool addStunConfig(int af);
136 void requestUpnpMappings();
137 bool hasUpnp() const;
138 // Take a list of address pairs (local/public) and add them as
139 // reflexive candidates using STUN config.
140 void addServerReflexiveCandidates(const std::vector<std::pair<IpAddr, IpAddr>>& addrList);
141 // Generate server reflexive candidates using the published (DHT/Account) address
142 std::vector<std::pair<IpAddr, IpAddr>> setupGenericReflexiveCandidates();
143 // Generate server reflexive candidates using UPNP mappings.
144 std::vector<std::pair<IpAddr, IpAddr>> setupUpnpReflexiveCandidates();
145 void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr);
146 IpAddr getDefaultRemoteAddress(unsigned comp_id) const;
147 bool handleEvents(unsigned max_msec);
148 int flushTimerHeapAndIoQueue();
149 int checkEventQueue(int maxEventToPoll);
150
151 std::shared_ptr<dht::log::Logger> logger_ {};
152
153 std::condition_variable_any iceCV_ {};
154
155 std::string sessionName_ {};
156 std::unique_ptr<pj_pool_t, decltype(&pj_pool_release)> pool_ {nullptr, pj_pool_release};
157 bool isTcp_ {false};
158 bool upnpEnabled_ {false};
159 IceTransportCompleteCb on_initdone_cb_ {};
160 IceTransportCompleteCb on_negodone_cb_ {};
161 pj_ice_strans* icest_ {nullptr};
162 unsigned streamsCount_ {0};
163 unsigned compCountPerStream_ {0};
164 unsigned compCount_ {0};
165 std::string local_ufrag_ {};
166 std::string local_pwd_ {};
167 pj_sockaddr remoteAddr_ {};
168 pj_ice_strans_cfg config_ {};
169 //std::string last_errmsg_ {};
170
171 std::atomic_bool is_stopped_ {false};
172
173 struct Packet
174 {
175 Packet(void* pkt, pj_size_t size)
176 : data {reinterpret_cast<char*>(pkt), reinterpret_cast<char*>(pkt) + size}
177 {}
178 std::vector<char> data {};
179 };
180
181 struct ComponentIO
182 {
183 std::mutex mutex;
184 std::condition_variable cv;
185 std::deque<Packet> queue;
186 IceRecvCb recvCb;
187 };
188
189 // NOTE: Component IDs start from 1, while these three vectors
190 // are indexed from 0. Conversion from ID to vector index must
191 // be done properly.
192 std::vector<ComponentIO> compIO_ {};
193 std::vector<PeerChannel> peerChannels_ {};
194 std::vector<IpAddr> iceDefaultRemoteAddr_;
195
196 // ICE controlling role. True for controller agents and false for
197 // controlled agents
198 std::atomic_bool initiatorSession_ {true};
199
200 // Local/Public addresses used by the account owning the ICE instance.
201 IpAddr accountLocalAddr_ {};
202 IpAddr accountPublicAddr_ {};
203
204 // STUN and TURN servers
205 std::vector<StunServerInfo> stunServers_;
206 std::vector<TurnServerInfo> turnServers_;
207
208 /**
209 * Returns the IP of each candidate for a given component in the ICE session
210 */
211 struct LocalCandidate
212 {
213 IpAddr addr;
214 pj_ice_cand_transport transport;
215 };
216
217 std::shared_ptr<upnp::Controller> upnp_ {};
218 std::mutex upnpMutex_ {};
219 std::map<Mapping::key_t, Mapping> upnpMappings_;
220 std::mutex upnpMappingsMutex_ {};
221
222 bool onlyIPv4Private_ {true};
223
224 // IO/Timer events are handled by following thread
225 std::thread thread_ {};
226 std::atomic_bool threadTerminateFlags_ {false};
227
228 // Wait data on components
229 mutable std::mutex sendDataMutex_ {};
230 std::condition_variable waitDataCv_ = {};
231 pj_size_t lastSentLen_ {0};
232 bool destroying_ {false};
233 onShutdownCb scb {};
234
235 void cancelOperations()
236 {
237 for (auto& c : peerChannels_)
238 c.stop();
239 std::lock_guard<std::mutex> lk(sendDataMutex_);
240 destroying_ = true;
241 waitDataCv_.notify_all();
242 }
243};
244
245//==============================================================================
246
247/**
248 * Add stun/turn configuration or default host as candidates
249 */
250
251static void
252add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info)
253{
254 if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN)
255 throw std::runtime_error("Too many STUN configurations");
256
257 IpAddr ip {info.uri};
258
259 // Given URI cannot be DNS resolved or not IPv4 or IPv6?
260 // This prevents a crash into PJSIP when ip.toString() is called.
261 if (ip.getFamily() == AF_UNSPEC) {
262 /*JAMI_DBG("[ice (%s)] STUN server '%s' not used, unresolvable address",
263 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
264 info.uri.c_str());*/
265 return;
266 }
267
268 auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++];
269 pj_ice_strans_stun_cfg_default(&stun);
270 pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str());
271 stun.af = ip.getFamily();
272 if (!(stun.port = ip.getPort()))
273 stun.port = PJ_STUN_PORT;
274 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
275 stun.conn_type = cfg.stun.conn_type;
276 /*JAMI_DBG("[ice (%s)] added stun server '%s', port %u",
277 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
278 pj_strbuf(&stun.server),
279 stun.port);*/
280}
281
282static void
283add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info)
284{
285 if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN)
286 throw std::runtime_error("Too many TURN servers");
287
288 IpAddr ip {info.uri};
289
290 // Same comment as add_stun_server()
291 if (ip.getFamily() == AF_UNSPEC) {
292 /*JAMI_DBG("[ice (%s)] TURN server '%s' not used, unresolvable address",
293 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
294 info.uri.c_str());*/
295 return;
296 }
297
298 auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++];
299 pj_ice_strans_turn_cfg_default(&turn);
300 pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str());
301 turn.af = ip.getFamily();
302 if (!(turn.port = ip.getPort()))
303 turn.port = PJ_STUN_PORT;
304 turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
305 turn.conn_type = cfg.turn.conn_type;
306
307 // Authorization (only static plain password supported yet)
308 if (not info.password.empty()) {
309 turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
310 turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
311 pj_strset(&turn.auth_cred.data.static_cred.realm,
312 (char*) info.realm.c_str(),
313 info.realm.size());
314 pj_strset(&turn.auth_cred.data.static_cred.username,
315 (char*) info.username.c_str(),
316 info.username.size());
317 pj_strset(&turn.auth_cred.data.static_cred.data,
318 (char*) info.password.c_str(),
319 info.password.size());
320 }
321
322 /*JAMI_DBG("[ice (%s)] added turn server '%s', port %u",
323 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
324 pj_strbuf(&turn.server),
325 turn.port);*/
326}
327
328//==============================================================================
329
Adrien Béraud89933c12023-07-26 14:53:30 -0400330IceTransport::Impl::Impl(std::string_view name, const std::shared_ptr<Logger>& logger)
331 : logger_(logger), sessionName_(name)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400332{
333 if (logger_)
334 logger_->debug("[ice:{}] Creating IceTransport session for \"{:s}\"", fmt::ptr(this), name);
335}
336
337IceTransport::Impl::~Impl()
338{
339 if (logger_)
340 logger_->debug("[ice:{}] destroying {}", fmt::ptr(this), fmt::ptr(icest_));
341
342 threadTerminateFlags_ = true;
343
344 if (thread_.joinable()) {
345 thread_.join();
346 }
347
348 if (icest_) {
349 pj_ice_strans* strans = nullptr;
350
351 std::swap(strans, icest_);
352
353 // must be done before ioqueue/timer destruction
354 if (logger_)
355 logger_->debug("[ice:{}] Destroying ice_strans {}", pj_ice_strans_get_user_data(strans), fmt::ptr(strans));
356
357 pj_ice_strans_stop_ice(strans);
358 pj_ice_strans_destroy(strans);
359
360 // NOTE: This last timer heap and IO queue polling is necessary to close
361 // TURN socket.
362 // Because when destroying the TURN session pjproject creates a pj_timer
363 // to postpone the TURN destruction. This timer is only called if we poll
364 // the event queue.
365
366 int ret = flushTimerHeapAndIoQueue();
367
368 if (ret < 0) {
369 if (logger_)
370 logger_->error("[ice:{}] IO queue polling failed", fmt::ptr(this));
371 } else if (ret > 0) {
372 if (logger_)
373 logger_->error("[ice:{}] Unexpected left timer in timer heap. "
374 "Please report the bug",
375 fmt::ptr(this));
376 }
377
378 if (checkEventQueue(1) > 0) {
379 if (logger_)
380 logger_->warn("[ice:{}] Unexpected left events in IO queue", fmt::ptr(this));
381 }
382
383 if (config_.stun_cfg.ioqueue)
384 pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
385
386 if (config_.stun_cfg.timer_heap)
387 pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
388 }
389
390 if (logger_)
391 logger_->debug("[ice:%p] done destroying", fmt::ptr(this));
392 if (scb)
393 scb();
394}
395
396void
397IceTransport::Impl::initIceInstance(const IceTransportOptions& options)
398{
399 isTcp_ = options.tcpEnable;
400 upnpEnabled_ = options.upnpEnable;
401 on_initdone_cb_ = options.onInitDone;
402 on_negodone_cb_ = options.onNegoDone;
403 streamsCount_ = options.streamsCount;
404 compCountPerStream_ = options.compCountPerStream;
405 compCount_ = streamsCount_ * compCountPerStream_;
406 compIO_ = std::vector<ComponentIO>(compCount_);
407 peerChannels_ = std::vector<PeerChannel>(compCount_);
408 iceDefaultRemoteAddr_.resize(compCount_);
409 initiatorSession_ = options.master;
410 accountLocalAddr_ = std::move(options.accountLocalAddr);
411 accountPublicAddr_ = std::move(options.accountPublicAddr);
412 stunServers_ = std::move(options.stunServers);
413 turnServers_ = std::move(options.turnServers);
414
415 if (logger_)
416 logger_->debug("[ice:{}] Initializing the session - comp count {} - as a {}",
417 fmt::ptr(this),
418 compCount_,
419 initiatorSession_ ? "master" : "slave");
420
421 if (upnpEnabled_)
Adrien Béraud25c30c42023-07-05 13:46:54 -0400422 upnp_.reset(new upnp::Controller(options.upnpContext));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400423
424 config_ = options.factory->getIceCfg(); // config copy
425 if (isTcp_) {
426 config_.protocol = PJ_ICE_TP_TCP;
427 config_.stun.conn_type = PJ_STUN_TP_TCP;
428 config_.turn.conn_type = PJ_TURN_TP_TCP;
429 } else {
430 config_.protocol = PJ_ICE_TP_UDP;
431 config_.stun.conn_type = PJ_STUN_TP_UDP;
432 config_.turn.conn_type = PJ_TURN_TP_UDP;
433 }
434
435 pool_.reset(
436 pj_pool_create(options.factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL));
437 if (not pool_)
438 throw std::runtime_error("pj_pool_create() failed");
439
440 // Note: For server reflexive candidates, UPNP mappings will
441 // be used if available. Then, the public address learnt during
442 // the account registration process will be added only if it
443 // differs from the UPNP public address.
444 // Also note that UPNP candidates should be added first in order
445 // to have a higher priority when performing the connectivity
446 // checks.
447 // STUN configs layout:
448 // - index 0 : host IPv4
449 // - index 1 : host IPv6
450 // - index 2 : upnp/generic srflx IPv4.
451 // - index 3 : generic srflx (if upnp exists and different)
452
453 config_.stun_tp_cnt = 0;
454
455 if (logger_)
456 logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
457 addStunConfig(pj_AF_INET());
458 addStunConfig(pj_AF_INET6());
459
460 std::vector<std::pair<IpAddr, IpAddr>> upnpSrflxCand;
461 if (upnp_) {
462 requestUpnpMappings();
463 upnpSrflxCand = setupUpnpReflexiveCandidates();
464 if (not upnpSrflxCand.empty()) {
465 addServerReflexiveCandidates(upnpSrflxCand);
466 if (logger_)
467 logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
468 }
469 }
470
471 auto genericSrflxCand = setupGenericReflexiveCandidates();
472
473 if (not genericSrflxCand.empty()) {
474 // Generic srflx candidates will be added only if different
475 // from upnp candidates.
476 if (upnpSrflxCand.empty()
477 or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) {
478 addServerReflexiveCandidates(genericSrflxCand);
479 if (logger_)
480 logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
481 }
482 }
483
484 if (upnpSrflxCand.empty() and genericSrflxCand.empty()) {
485 if (logger_)
486 logger_->warn("[ice:{}] No server reflexive candidates added", fmt::ptr(this));
487 }
488
489 pj_ice_strans_cb icecb;
490 pj_bzero(&icecb, sizeof(icecb));
491
492 icecb.on_rx_data = [](pj_ice_strans* ice_st,
493 unsigned comp_id,
494 void* pkt,
495 pj_size_t size,
496 const pj_sockaddr_t* /*src_addr*/,
497 unsigned /*src_addr_len*/) {
498 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
499 tr->onReceiveData(comp_id, pkt, size);
500 };
501
502 icecb.on_ice_complete = [](pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) {
503 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
504 tr->onComplete(ice_st, op, status);
505 };
506
507 if (isTcp_) {
508 icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
509 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
510 std::lock_guard lk(tr->sendDataMutex_);
511 tr->lastSentLen_ += size;
512 tr->waitDataCv_.notify_all();
513 }
514 };
515 }
516
517 icecb.on_destroy = [](pj_ice_strans* ice_st) {
518 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
519 tr->cancelOperations(); // Avoid upper layer to manage this ; Stop read operations
520 };
521
522 // Add STUN servers
523 for (auto& server : stunServers_)
524 add_stun_server(*pool_, config_, server);
525
526 // Add TURN servers
527 for (auto& server : turnServers_)
528 add_turn_server(*pool_, config_, server);
529
530 static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64);
531 TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap));
532 TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue));
533 std::ostringstream sessionName {};
534 // We use the instance pointer as the PJNATH session name in order
535 // to easily identify the logs reported by PJNATH.
536 sessionName << this;
537 pj_status_t status = pj_ice_strans_create(sessionName.str().c_str(),
538 &config_,
539 compCount_,
540 this,
541 &icecb,
542 &icest_);
543
544 if (status != PJ_SUCCESS || icest_ == nullptr) {
545 throw std::runtime_error("pj_ice_strans_create() failed");
546 }
547
548 // Must be created after any potential failure
549 thread_ = std::thread([this] {
550 while (not threadTerminateFlags_) {
551 // NOTE: handleEvents can return false in this case
552 // but here we don't care if there is event or not.
553 handleEvents(HANDLE_EVENT_DURATION);
554 }
555 });
556}
557
558bool
559IceTransport::Impl::_isInitialized() const
560{
561 if (auto *icest = icest_) {
562 auto state = pj_ice_strans_get_state(icest);
563 return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
564 }
565 return false;
566}
567
568bool
569IceTransport::Impl::_isStarted() const
570{
571 if (auto *icest = icest_) {
572 auto state = pj_ice_strans_get_state(icest);
573 return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
574 }
575 return false;
576}
577
578bool
579IceTransport::Impl::_isRunning() const
580{
581 if (auto *icest = icest_) {
582 auto state = pj_ice_strans_get_state(icest);
583 return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
584 }
585 return false;
586}
587
588bool
589IceTransport::Impl::_isFailed() const
590{
591 if (auto *icest = icest_)
592 return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
593 return false;
594}
595
596bool
597IceTransport::Impl::handleEvents(unsigned max_msec)
598{
599 // By tests, never seen more than two events per 500ms
600 static constexpr auto MAX_NET_EVENTS = 2;
601
602 pj_time_val max_timeout = {0, static_cast<long>(max_msec)};
603 pj_time_val timeout = {0, 0};
604 unsigned net_event_count = 0;
605
606 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
607 auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32;
608
609 // timeout limitation
610 if (hasActiveTimer)
611 pj_time_val_normalize(&timeout);
612
613 if (PJ_TIME_VAL_GT(timeout, max_timeout)) {
614 timeout = max_timeout;
615 }
616
617 do {
618 auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
619
620 // timeout
621 if (not n_events)
622 return hasActiveTimer;
623
624 // error
625 if (n_events < 0) {
626 const auto err = pj_get_os_error();
627 // Kept as debug as some errors are "normal" in regular context
628 if (logger_)
629 logger_->debug("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
630 std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
631 return hasActiveTimer;
632 }
633
634 net_event_count += n_events;
635 timeout.sec = timeout.msec = 0;
636 } while (net_event_count < MAX_NET_EVENTS);
637 return hasActiveTimer;
638}
639
640int
641IceTransport::Impl::flushTimerHeapAndIoQueue()
642{
643 pj_time_val timerTimeout = {0, 0};
644 pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION};
645 bool hasActiveTimer = false;
646 std::chrono::milliseconds totalWaitTime {0};
647 auto const start = std::chrono::steady_clock::now();
648 // We try to process pending events as fast as possible to
649 // speed-up the release.
650 int maxEventToProcess = 10;
651
652 do {
653 if (checkEventQueue(maxEventToProcess) < 0)
654 return -1;
655
656 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout);
657 hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32);
658
659 if (hasActiveTimer) {
660 pj_time_val_normalize(&timerTimeout);
661 auto waitTime = std::chrono::milliseconds(
662 std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime)));
663 std::this_thread::sleep_for(waitTime);
664 totalWaitTime += waitTime;
665 }
666 } while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT));
667
668 auto duration = std::chrono::steady_clock::now() - start;
669 if (logger_)
670 logger_->debug("[ice:{}] Timer heap flushed after {}", fmt::ptr(this), dht::print_duration(duration));
671
672 return static_cast<int>(pj_timer_heap_count(config_.stun_cfg.timer_heap));
673}
674
675int
676IceTransport::Impl::checkEventQueue(int maxEventToPoll)
677{
678 pj_time_val timeout = {0, 0};
679 int eventCount = 0;
680 int events = 0;
681
682 do {
683 events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
684 if (events < 0) {
685 const auto err = pj_get_os_error();
686 if (logger_)
687 logger_->error("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
688 return events;
689 }
690
691 eventCount += events;
692
693 } while (events > 0 && eventCount < maxEventToPoll);
694
695 return eventCount;
696}
697
698void
699IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status)
700{
701 const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization"
702 : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation"
703 : "unknown_op";
704
705 const bool done = status == PJ_SUCCESS;
706 if (done) {
707 if (logger_)
708 logger_->debug("[ice:{}] {:s} {:s} success",
709 fmt::ptr(this),
710 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
711 opname);
712 } else {
713 if (logger_)
714 logger_->error("[ice:{}] {:s} {:s} failed: {:s}",
715 fmt::ptr(this),
716 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
717 opname,
718 sip_utils::sip_strerror(status));
719 }
720
721 if (done and op == PJ_ICE_STRANS_OP_INIT) {
722 if (initiatorSession_)
723 setInitiatorSession();
724 else
725 setSlaveSession();
726 }
727
728 if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
729 on_initdone_cb_(done);
730 else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) {
731 if (done) {
732 // Dump of connection pairs
733 if (logger_)
734 logger_->debug("[ice:{}] {:s} connection pairs ([comp id] local [type] <-> remote [type]):\n{:s}",
735 fmt::ptr(this),
736 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
737 link());
738 }
739 if (on_negodone_cb_)
740 on_negodone_cb_(done);
741 }
742
743 iceCV_.notify_all();
744}
745
746std::string
747IceTransport::Impl::link() const
748{
749 std::ostringstream out;
750 for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) {
751 auto absIdx = strm;
752 auto comp = (strm + 1) / compCountPerStream_;
753 auto laddr = getLocalAddress(absIdx);
754 auto raddr = getRemoteAddress(absIdx);
755
756 if (laddr and laddr.getPort() != 0 and raddr and raddr.getPort() != 0) {
757 out << " [" << comp << "] " << laddr.toString(true, true) << " ["
758 << getCandidateType(getSelectedCandidate(absIdx, false)) << "] "
759 << " <-> " << raddr.toString(true, true) << " ["
760 << getCandidateType(getSelectedCandidate(absIdx, true)) << "] " << '\n';
761 } else {
762 out << " [" << comp << "] disabled\n";
763 }
764 }
765 return out.str();
766}
767
768bool
769IceTransport::Impl::setInitiatorSession()
770{
771 if (logger_)
772 logger_->debug("[ice:{}] as master", fmt::ptr(this));
773 initiatorSession_ = true;
774 if (_isInitialized()) {
775 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING);
776 if (status != PJ_SUCCESS) {
777 if (logger_)
778 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
779 return false;
780 }
781 return true;
782 }
783 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING);
784}
785
786bool
787IceTransport::Impl::setSlaveSession()
788{
789 if (logger_)
790 logger_->debug("[ice:{}] as slave", fmt::ptr(this));
791 initiatorSession_ = false;
792 if (_isInitialized()) {
793 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED);
794 if (status != PJ_SUCCESS) {
795 if (logger_)
796 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
797 return false;
798 }
799 return true;
800 }
801 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED);
802}
803
804const pj_ice_sess_cand*
805IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const
806{
807 ASSERT_COMP_ID(comp_id, compCount_);
808
809 // Return the selected candidate pair. Might not be the nominated pair if
810 // ICE has not concluded yet, but should be the nominated pair afterwards.
811 if (not _isRunning()) {
812 if (logger_)
813 logger_->error("[ice:{}] ICE transport is not running", fmt::ptr(this));
814 return nullptr;
815 }
816
817 const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id);
818 if (sess == nullptr) {
819 if (logger_)
820 logger_->warn("[ice:{}] Component {} has no valid pair (disabled)", fmt::ptr(this), comp_id);
821 return nullptr;
822 }
823
824 if (remote)
825 return sess->rcand;
826 else
827 return sess->lcand;
828}
829
830IpAddr
831IceTransport::Impl::getLocalAddress(unsigned comp_id) const
832{
833 ASSERT_COMP_ID(comp_id, compCount_);
834
835 if (auto cand = getSelectedCandidate(comp_id, false))
836 return cand->addr;
837
838 return {};
839}
840
841IpAddr
842IceTransport::Impl::getRemoteAddress(unsigned comp_id) const
843{
844 ASSERT_COMP_ID(comp_id, compCount_);
845
846 if (auto cand = getSelectedCandidate(comp_id, true))
847 return cand->addr;
848
849 return {};
850}
851
852const char*
853IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand)
854{
855 auto name = cand ? pj_ice_get_cand_type_name(cand->type) : nullptr;
856 return name ? name : "?";
857}
858
859void
860IceTransport::Impl::getUFragPwd()
861{
862 if (icest_) {
863 pj_str_t local_ufrag, local_pwd;
864
865 pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr);
866 local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
867 local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
868 }
869}
870
871bool
872IceTransport::Impl::createIceSession(pj_ice_sess_role role)
873{
874 if (not icest_) {
875 return false;
876 }
877
878 if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) {
879 if (logger_)
880 logger_->error("[ice:{}] pj_ice_strans_init_ice() failed", fmt::ptr(this));
881 return false;
882 }
883
884 // Fetch some information on local configuration
885 getUFragPwd();
886
887 if (logger_)
888 logger_->debug("[ice:{}] (local) ufrag=%s, pwd=%s", fmt::ptr(this), local_ufrag_.c_str(), local_pwd_.c_str());
889
890 return true;
891}
892
893bool
894IceTransport::Impl::addStunConfig(int af)
895{
896 if (config_.stun_tp_cnt >= PJ_ICE_MAX_STUN) {
897 if (logger_)
898 logger_->error("Max number of STUN configurations reached (%i)", PJ_ICE_MAX_STUN);
899 return false;
900 }
901
902 if (af != pj_AF_INET() and af != pj_AF_INET6()) {
903 if (logger_)
904 logger_->error("Invalid address familly (%i)", af);
905 return false;
906 }
907
908 auto& stun = config_.stun_tp[config_.stun_tp_cnt++];
909
910 pj_ice_strans_stun_cfg_default(&stun);
911 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
912 stun.af = af;
913 stun.conn_type = config_.stun.conn_type;
914
915 if (logger_)
916 logger_->debug("[ice:{}] added host stun config for {:s} transport",
917 fmt::ptr(this),
918 config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
919
920 return true;
921}
922
923void
924IceTransport::Impl::requestUpnpMappings()
925{
926 // Must be called once !
927
928 std::lock_guard<std::mutex> lock(upnpMutex_);
929
930 if (not upnp_)
931 return;
932
933 auto transport = isTcpEnabled() ? PJ_CAND_TCP_PASSIVE : PJ_CAND_UDP;
934 auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP;
935
936 // Request upnp mapping for each component.
937 for (unsigned id = 1; id <= compCount_; id++) {
938 // Set port number to 0 to get any available port.
939 Mapping requestedMap(portType);
940
941 // Request the mapping
942 Mapping::sharedPtr_t mapPtr = upnp_->reserveMapping(requestedMap);
943
944 // To use a mapping, it must be valid, open and has valid host address.
945 if (mapPtr and mapPtr->getMapKey() and (mapPtr->getState() == MappingState::OPEN)
946 and mapPtr->hasValidHostAddress()) {
947 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
948 auto ret = upnpMappings_.emplace(mapPtr->getMapKey(), *mapPtr);
949 if (ret.second) {
950 if (logger_)
951 logger_->debug("[ice:{}] UPNP mapping {:s} successfully allocated",
952 fmt::ptr(this),
953 mapPtr->toString(true));
954 } else {
955 if (logger_)
956 logger_->warn("[ice:{}] UPNP mapping {:s} already in the list!",
957 fmt::ptr(this),
958 mapPtr->toString());
959 }
960 } else {
961 if (logger_)
962 logger_->warn("[ice:{}] UPNP mapping request failed!", fmt::ptr(this));
963 upnp_->releaseMapping(requestedMap);
964 }
965 }
966}
967
968bool
969IceTransport::Impl::hasUpnp() const
970{
971 return upnp_ and upnpMappings_.size() == compCount_;
972}
973
974void
975IceTransport::Impl::addServerReflexiveCandidates(
976 const std::vector<std::pair<IpAddr, IpAddr>>& addrList)
977{
978 if (addrList.size() != compCount_) {
979 if (logger_)
980 logger_->warn("[ice:{}] Provided addr list size {} does not match component count {}",
981 fmt::ptr(this),
982 addrList.size(),
983 compCount_);
984 return;
985 }
986 if (compCount_ > PJ_ICE_MAX_COMP) {
987 if (logger_)
988 logger_->error("[ice:{}] Too many components", fmt::ptr(this));
989 return;
990 }
991
992 // Add config for server reflexive candidates (UPNP or from DHT).
993 if (not addStunConfig(pj_AF_INET()))
994 return;
995
996 assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN);
997 auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1];
998
999 for (unsigned id = 1; id <= compCount_; id++) {
1000 auto idx = id - 1;
1001 auto& localAddr = addrList[idx].first;
1002 auto& publicAddr = addrList[idx].second;
1003
1004 if (logger_)
1005 logger_->debug("[ice:{}] Add srflx reflexive candidates [{:s} : {:s}] for comp {:d}",
1006 fmt::ptr(this),
1007 localAddr.toString(true),
1008 publicAddr.toString(true),
1009 id);
1010
1011 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr());
1012 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr());
1013
1014 if (isTcpEnabled()) {
1015 if (publicAddr.getPort() == 9) {
1016 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE;
1017 } else {
1018 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE;
1019 }
1020 } else {
1021 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP;
1022 }
1023 }
1024
1025 stun.cfg.user_mapping_cnt = compCount_;
1026}
1027
1028std::vector<std::pair<IpAddr, IpAddr>>
1029IceTransport::Impl::setupGenericReflexiveCandidates()
1030{
1031 if (not accountLocalAddr_) {
1032 if (logger_)
1033 logger_->warn("[ice:{}] Missing local address, generic srflx candidates wont be generated!",
1034 fmt::ptr(this));
1035 return {};
1036 }
1037
1038 if (not accountPublicAddr_) {
1039 if (logger_)
1040 logger_->warn("[ice:{}] Missing public address, generic srflx candidates wont be generated!",
1041 fmt::ptr(this));
1042 return {};
1043 }
1044
1045 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1046 auto isTcp = isTcpEnabled();
1047
1048 addrList.reserve(compCount_);
1049 for (unsigned id = 1; id <= compCount_; id++) {
1050 // For TCP, the type is set to active, because most likely the incoming
1051 // connection will be blocked by the NAT.
1052 // For UDP use random port number.
1053 uint16_t port = isTcp ? 9
1054 : upnp::Controller::generateRandomPort(isTcp ? PortType::TCP
1055 : PortType::UDP);
1056
1057 accountLocalAddr_.setPort(port);
1058 accountPublicAddr_.setPort(port);
1059 addrList.emplace_back(accountLocalAddr_, accountPublicAddr_);
1060 }
1061
1062 return addrList;
1063}
1064
1065std::vector<std::pair<IpAddr, IpAddr>>
1066IceTransport::Impl::setupUpnpReflexiveCandidates()
1067{
1068 // Add UPNP server reflexive candidates if available.
1069 if (not hasUpnp())
1070 return {};
1071
1072 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
1073
1074 if (upnpMappings_.size() < (size_t)compCount_) {
1075 if (logger_)
1076 logger_->warn("[ice:{}] Not enough mappings {:d}. Expected {:d}",
1077 fmt::ptr(this),
1078 upnpMappings_.size(),
1079 compCount_);
1080 return {};
1081 }
1082
1083 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1084
1085 addrList.reserve(upnpMappings_.size());
1086 for (auto const& [_, map] : upnpMappings_) {
1087 assert(map.getMapKey());
1088 IpAddr localAddr {map.getInternalAddress()};
1089 localAddr.setPort(map.getInternalPort());
1090 IpAddr publicAddr {map.getExternalAddress()};
1091 publicAddr.setPort(map.getExternalPort());
1092 addrList.emplace_back(localAddr, publicAddr);
1093 }
1094
1095 return addrList;
1096}
1097
1098void
1099IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr)
1100{
1101 ASSERT_COMP_ID(compId, compCount_);
1102
1103 iceDefaultRemoteAddr_[compId - 1] = addr;
1104 // The port does not matter. Set it 0 to avoid confusion.
1105 iceDefaultRemoteAddr_[compId - 1].setPort(0);
1106}
1107
1108IpAddr
1109IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const
1110{
1111 ASSERT_COMP_ID(compId, compCount_);
1112 return iceDefaultRemoteAddr_[compId - 1];
1113}
1114
1115void
1116IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size)
1117{
1118 ASSERT_COMP_ID(comp_id, compCount_);
1119
1120 jami_tracepoint_if_enabled(ice_transport_recv,
1121 reinterpret_cast<uint64_t>(this),
1122 comp_id,
1123 size,
1124 getRemoteAddress(comp_id).toString().c_str());
1125 if (size == 0)
1126 return;
1127
1128 {
1129 auto& io = compIO_[comp_id - 1];
1130 std::lock_guard<std::mutex> lk(io.mutex);
1131
1132 if (io.recvCb) {
1133 io.recvCb((uint8_t*) pkt, size);
1134 return;
1135 }
1136 }
1137
1138 std::error_code ec;
1139 auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec);
1140 if (err < 0) {
1141 if (logger_)
1142 logger_->error("[ice:{}] rx: channel is closed", fmt::ptr(this));
1143 }
1144}
1145
1146bool
1147IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout)
1148{
1149 IceLock lk(icest_);
1150
1151 if (not iceCV_.wait_for(lk, timeout, [this] {
1152 return threadTerminateFlags_ or _isInitialized() or _isFailed();
1153 })) {
1154 if (logger_)
1155 logger_->warn("[ice:{}] waitForInitialization: timeout", fmt::ptr(this));
1156 return false;
1157 }
1158
1159 return _isInitialized();
1160}
1161
1162//==============================================================================
1163
Adrien Béraud89933c12023-07-26 14:53:30 -04001164IceTransport::IceTransport(std::string_view name, const std::shared_ptr<dht::log::Logger>& logger)
1165 : pimpl_ {std::make_unique<Impl>(name, logger)}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001166{}
1167
1168IceTransport::~IceTransport()
1169{
1170 cancelOperations();
1171}
1172
1173const std::shared_ptr<dht::log::Logger>&
1174IceTransport::logger() const
1175{
1176 return pimpl_->logger_;
1177}
1178
1179void
1180IceTransport::initIceInstance(const IceTransportOptions& options)
1181{
1182 pimpl_->initIceInstance(options);
1183 jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this));
1184}
1185
1186bool
1187IceTransport::isInitialized() const
1188{
1189 IceLock lk(pimpl_->icest_);
1190 return pimpl_->_isInitialized();
1191}
1192
1193bool
1194IceTransport::isStarted() const
1195{
1196 IceLock lk(pimpl_->icest_);
1197 return pimpl_->_isStarted();
1198}
1199
1200bool
1201IceTransport::isRunning() const
1202{
1203 if (!pimpl_->icest_)
1204 return false;
1205 IceLock lk(pimpl_->icest_);
1206 return pimpl_->_isRunning();
1207}
1208
1209bool
1210IceTransport::isFailed() const
1211{
1212 return pimpl_->_isFailed();
1213}
1214
1215unsigned
1216IceTransport::getComponentCount() const
1217{
1218 return pimpl_->compCount_;
1219}
1220
1221bool
1222IceTransport::setSlaveSession()
1223{
1224 return pimpl_->setSlaveSession();
1225}
1226bool
1227IceTransport::setInitiatorSession()
1228{
1229 return pimpl_->setInitiatorSession();
1230}
1231
1232bool
1233IceTransport::isInitiator() const
1234{
1235 if (isInitialized()) {
1236 return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING;
1237 }
1238 return pimpl_->initiatorSession_;
1239}
1240
1241bool
1242IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& rem_candidates)
1243{
1244 if (not isInitialized()) {
1245 if (pimpl_->logger_)
1246 pimpl_->logger_->error("[ice:{}] not initialized transport", fmt::ptr(pimpl_.get()));
1247 pimpl_->is_stopped_ = true;
1248 return false;
1249 }
1250
1251 // pj_ice_strans_start_ice crashes if remote candidates array is empty
1252 if (rem_candidates.empty()) {
1253 if (pimpl_->logger_)
1254 pimpl_->logger_->error("[ice:{}] start failed: no remote candidates", fmt::ptr(pimpl_.get()));
1255 pimpl_->is_stopped_ = true;
1256 return false;
1257 }
1258
1259 auto comp_cnt = std::max(1u, getComponentCount());
1260 if (rem_candidates.size() / comp_cnt > PJ_ICE_ST_MAX_CAND - 1) {
1261 std::vector<IceCandidate> rcands;
1262 rcands.reserve(PJ_ICE_ST_MAX_CAND - 1);
1263 if (pimpl_->logger_)
1264 pimpl_->logger_->warn("[ice:{}] too much candidates detected, trim list.", fmt::ptr(pimpl_.get()));
1265 // Just trim some candidates. To avoid to only take host candidates, iterate
1266 // through the whole list and select some host, some turn and peer reflexives
1267 // It should give at least enough infos to negotiate.
1268 auto maxHosts = 8;
1269 auto maxRelays = PJ_ICE_MAX_TURN;
1270 for (auto& c : rem_candidates) {
1271 if (c.type == PJ_ICE_CAND_TYPE_HOST) {
1272 if (maxHosts == 0)
1273 continue;
1274 maxHosts -= 1;
1275 } else if (c.type == PJ_ICE_CAND_TYPE_RELAYED) {
1276 if (maxRelays == 0)
1277 continue;
1278 maxRelays -= 1;
1279 }
1280 if (rcands.size() == PJ_ICE_ST_MAX_CAND - 1)
1281 break;
1282 rcands.emplace_back(std::move(c));
1283 }
1284 rem_candidates = std::move(rcands);
1285 }
1286
1287 pj_str_t ufrag, pwd;
1288 if (pimpl_->logger_)
1289 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:d} remote candidates)",
1290 fmt::ptr(pimpl_),
1291 rem_candidates.size());
1292
1293 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1294 pj_strset(&ufrag,
1295 (char*) rem_attrs.ufrag.c_str(),
1296 rem_attrs.ufrag.size()),
1297 pj_strset(&pwd,
1298 (char*) rem_attrs.pwd.c_str(),
1299 rem_attrs.pwd.size()),
1300 rem_candidates.size(),
1301 rem_candidates.data());
1302 if (status != PJ_SUCCESS) {
1303 if (pimpl_->logger_)
1304 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_.get()), sip_utils::sip_strerror(status));
1305 pimpl_->is_stopped_ = true;
1306 return false;
1307 }
1308
1309 return true;
1310}
1311
1312bool
1313IceTransport::startIce(const SDP& sdp)
1314{
1315 if (pimpl_->streamsCount_ != 1) {
1316 if (pimpl_->logger_)
Adrien Béraud49b07192023-06-13 20:03:33 -04001317 pimpl_->logger_->error(FMT_STRING("Expected exactly one stream per SDP (found {:d} streams)"), pimpl_->streamsCount_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001318 return false;
1319 }
1320
1321 if (not isInitialized()) {
1322 if (pimpl_->logger_)
1323 pimpl_->logger_->error(FMT_STRING("[ice:{}] not initialized transport"), fmt::ptr(pimpl_));
1324 pimpl_->is_stopped_ = true;
1325 return false;
1326 }
1327
1328 for (unsigned id = 1; id <= getComponentCount(); id++) {
1329 auto candVec = getLocalCandidates(id);
1330 for (auto const& cand : candVec) {
1331 if (pimpl_->logger_)
1332 pimpl_->logger_->debug("[ice:{}] Using local candidate {:s} for comp {:d}",
1333 fmt::ptr(pimpl_), cand, id);
1334 }
1335 }
1336
1337 if (pimpl_->logger_)
1338 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:u} remote candidates)",
1339 fmt::ptr(pimpl_), sdp.candidates.size());
1340 pj_str_t ufrag, pwd;
1341
1342 std::vector<IceCandidate> rem_candidates;
1343 rem_candidates.reserve(sdp.candidates.size());
1344 IceCandidate cand;
1345 for (const auto& line : sdp.candidates) {
1346 if (parseIceAttributeLine(0, line, cand))
1347 rem_candidates.emplace_back(cand);
1348 }
1349
1350 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1351 pj_strset(&ufrag,
1352 (char*) sdp.ufrag.c_str(),
1353 sdp.ufrag.size()),
1354 pj_strset(&pwd, (char*) sdp.pwd.c_str(), sdp.pwd.size()),
1355 rem_candidates.size(),
1356 rem_candidates.data());
1357 if (status != PJ_SUCCESS) {
1358 if (pimpl_->logger_)
1359 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1360 pimpl_->is_stopped_ = true;
1361 return false;
1362 }
1363
1364 return true;
1365}
1366
1367void
1368IceTransport::cancelOperations()
1369{
1370 pimpl_->cancelOperations();
1371}
1372
1373IpAddr
1374IceTransport::getLocalAddress(unsigned comp_id) const
1375{
1376 return pimpl_->getLocalAddress(comp_id);
1377}
1378
1379IpAddr
1380IceTransport::getRemoteAddress(unsigned comp_id) const
1381{
1382 // Return the default remote address if set.
1383 // Note that the default remote addresses are the addresses
1384 // set in the 'c=' and 'a=rtcp' lines of the received SDP.
1385 // See pj_ice_strans_sendto2() for more details.
1386 if (auto defaultAddr = pimpl_->getDefaultRemoteAddress(comp_id)) {
1387 return defaultAddr;
1388 }
1389
1390 return pimpl_->getRemoteAddress(comp_id);
1391}
1392
1393const IceTransport::Attribute
1394IceTransport::getLocalAttributes() const
1395{
1396 return {pimpl_->local_ufrag_, pimpl_->local_pwd_};
1397}
1398
1399std::vector<std::string>
1400IceTransport::getLocalCandidates(unsigned comp_id) const
1401{
1402 ASSERT_COMP_ID(comp_id, getComponentCount());
1403 std::vector<std::string> res;
1404 pj_ice_sess_cand cand[MAX_CANDIDATES];
1405 unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
1406
1407 if (!isInitialized()) {
1408 return res;
1409 }
1410
1411 if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
1412 if (pimpl_->logger_)
1413 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1414 return res;
1415 }
1416
1417 res.reserve(cand_cnt);
1418 for (unsigned i = 0; i < cand_cnt; ++i) {
1419 /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544)
1420 * candidate-attribute = "candidate" ":" foundation SP component-id
1421 * SP "TCP" SP priority SP connection-address SP port SP cand-type [SP
1422 * rel-addr] [SP rel-port] SP tcp-type-ext
1423 * *(SP extension-att-name SP
1424 * extension-att-value)
1425 *
1426 * tcp-type-ext = "tcptype" SP tcp-type
1427 * tcp-type = "active" / "passive" / "so"
1428 */
1429 char ipaddr[PJ_INET6_ADDRSTRLEN];
1430 std::string tcp_type;
1431 if (cand[i].transport != PJ_CAND_UDP) {
1432 tcp_type += " tcptype";
1433 switch (cand[i].transport) {
1434 case PJ_CAND_TCP_ACTIVE:
1435 tcp_type += " active";
1436 break;
1437 case PJ_CAND_TCP_PASSIVE:
1438 tcp_type += " passive";
1439 break;
1440 case PJ_CAND_TCP_SO:
1441 default:
1442 tcp_type += " so";
1443 break;
1444 }
1445 }
1446 res.emplace_back(
1447 fmt::format("{} {} {} {} {} {} typ {}{}",
1448 sip_utils::as_view(cand[i].foundation),
1449 cand[i].comp_id,
1450 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1451 cand[i].prio,
1452 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1453 pj_sockaddr_get_port(&cand[i].addr),
1454 pj_ice_get_cand_type_name(cand[i].type),
1455 tcp_type));
1456 }
1457
1458 return res;
1459}
1460std::vector<std::string>
1461IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
1462{
1463 ASSERT_COMP_ID(compId, getComponentCount());
1464
1465 std::vector<std::string> res;
1466 pj_ice_sess_cand cand[MAX_CANDIDATES];
1467 unsigned cand_cnt = MAX_CANDIDATES;
1468
1469 if (not isInitialized()) {
1470 return res;
1471 }
1472
1473 // In the implementation, the component IDs are enumerated globally
1474 // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create
1475 // only one pj_ice_strans instance. However, the component IDs are
1476 // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in
1477 // order to be compliant with the spec.
1478
1479 auto globalCompId = streamIdx * 2 + compId;
1480 if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) {
1481 if (pimpl_->logger_)
1482 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1483 return res;
1484 }
1485
1486 res.reserve(cand_cnt);
1487 // Build ICE attributes according to RFC 6544, section 4.5.
1488 for (unsigned i = 0; i < cand_cnt; ++i) {
1489 char ipaddr[PJ_INET6_ADDRSTRLEN];
1490 std::string tcp_type;
1491 if (cand[i].transport != PJ_CAND_UDP) {
1492 tcp_type += " tcptype";
1493 switch (cand[i].transport) {
1494 case PJ_CAND_TCP_ACTIVE:
1495 tcp_type += " active";
1496 break;
1497 case PJ_CAND_TCP_PASSIVE:
1498 tcp_type += " passive";
1499 break;
1500 case PJ_CAND_TCP_SO:
1501 default:
1502 tcp_type += " so";
1503 break;
1504 }
1505 }
1506 res.emplace_back(
1507 fmt::format("{} {} {} {} {} {} typ {}{}",
1508 sip_utils::as_view(cand[i].foundation),
1509 compId,
1510 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1511 cand[i].prio,
1512 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1513 pj_sockaddr_get_port(&cand[i].addr),
1514 pj_ice_get_cand_type_name(cand[i].type),
1515 tcp_type));
1516 }
1517
1518 return res;
1519}
1520
1521bool
1522IceTransport::parseIceAttributeLine(unsigned streamIdx,
1523 const std::string& line,
1524 IceCandidate& cand) const
1525{
1526 // Silently ignore empty lines
1527 if (line.empty())
1528 return false;
1529
1530 if (streamIdx >= pimpl_->streamsCount_) {
1531 throw std::runtime_error(fmt::format("Stream index {:d} is invalid!", streamIdx));
1532 }
1533
1534 int af, cnt;
1535 char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32];
1536 pj_str_t tmpaddr;
1537 unsigned comp_id, prio, port;
1538 pj_status_t status;
1539 pj_bool_t is_tcp = PJ_FALSE;
1540
1541 // Parse ICE attribute line according to RFC-6544 section 4.5.
1542 // TODO/WARNING: There is no fail-safe in case of malformed attributes.
1543 cnt = sscanf(line.c_str(),
1544 "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n",
1545 foundation,
1546 &comp_id,
1547 transport,
1548 &prio,
1549 ipaddr,
1550 &port,
1551 type,
1552 tcp_type);
1553 if (cnt != 7 && cnt != 8) {
1554 if (pimpl_->logger_)
1555 pimpl_->logger_->error("[ice:{}] Invalid ICE candidate line: {:s}", fmt::ptr(pimpl_), line);
1556 return false;
1557 }
1558
1559 if (strcmp(transport, "TCP") == 0) {
1560 is_tcp = PJ_TRUE;
1561 }
1562
1563 pj_bzero(&cand, sizeof(IceCandidate));
1564
1565 if (strcmp(type, "host") == 0)
1566 cand.type = PJ_ICE_CAND_TYPE_HOST;
1567 else if (strcmp(type, "srflx") == 0)
1568 cand.type = PJ_ICE_CAND_TYPE_SRFLX;
1569 else if (strcmp(type, "prflx") == 0)
1570 cand.type = PJ_ICE_CAND_TYPE_PRFLX;
1571 else if (strcmp(type, "relay") == 0)
1572 cand.type = PJ_ICE_CAND_TYPE_RELAYED;
1573 else {
1574 if (pimpl_->logger_)
1575 pimpl_->logger_->warn("[ice:{}] invalid remote candidate type '{:s}'", fmt::ptr(pimpl_), type);
1576 return false;
1577 }
1578
1579 if (is_tcp) {
1580 if (strcmp(tcp_type, "active") == 0)
1581 cand.transport = PJ_CAND_TCP_ACTIVE;
1582 else if (strcmp(tcp_type, "passive") == 0)
1583 cand.transport = PJ_CAND_TCP_PASSIVE;
1584 else if (strcmp(tcp_type, "so") == 0)
1585 cand.transport = PJ_CAND_TCP_SO;
1586 else {
1587 if (pimpl_->logger_)
1588 pimpl_->logger_->warn("[ice:{}] invalid transport type type '{:s}'", fmt::ptr(pimpl_), tcp_type);
1589 return false;
1590 }
1591 } else {
1592 cand.transport = PJ_CAND_UDP;
1593 }
1594
1595 // If the component Id is enumerated relative to media, convert
1596 // it to absolute enumeration.
1597 if (comp_id <= pimpl_->compCountPerStream_) {
1598 comp_id += pimpl_->compCountPerStream_ * streamIdx;
1599 }
1600 cand.comp_id = (pj_uint8_t) comp_id;
1601
1602 cand.prio = prio;
1603
1604 if (strchr(ipaddr, ':'))
1605 af = pj_AF_INET6();
1606 else {
1607 af = pj_AF_INET();
1608 pimpl_->onlyIPv4Private_ &= IpAddr(ipaddr).isPrivate();
1609 }
1610
1611 tmpaddr = pj_str(ipaddr);
1612 pj_sockaddr_init(af, &cand.addr, NULL, 0);
1613 status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr);
1614 if (status != PJ_SUCCESS) {
1615 if (pimpl_->logger_)
1616 pimpl_->logger_->warn("[ice:{}] invalid IP address '{:s}'", fmt::ptr(pimpl_), ipaddr);
1617 return false;
1618 }
1619
1620 pj_sockaddr_set_port(&cand.addr, (pj_uint16_t) port);
1621 pj_strdup2(pimpl_->pool_.get(), &cand.foundation, foundation);
1622
1623 return true;
1624}
1625
1626ssize_t
1627IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec)
1628{
1629 ASSERT_COMP_ID(compId, getComponentCount());
1630 auto& io = pimpl_->compIO_[compId - 1];
1631 std::lock_guard<std::mutex> lk(io.mutex);
1632
1633 if (io.queue.empty()) {
1634 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
1635 return -1;
1636 }
1637
1638 auto& packet = io.queue.front();
1639 const auto count = std::min(len, packet.data.size());
1640 std::copy_n(packet.data.begin(), count, buf);
1641 if (count == packet.data.size()) {
1642 io.queue.pop_front();
1643 } else {
1644 packet.data.erase(packet.data.begin(), packet.data.begin() + count);
1645 }
1646
1647 ec.clear();
1648 return count;
1649}
1650
1651ssize_t
1652IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec)
1653{
1654 ASSERT_COMP_ID(compId, getComponentCount());
1655 return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec);
1656}
1657
1658void
1659IceTransport::setOnRecv(unsigned compId, IceRecvCb cb)
1660{
1661 ASSERT_COMP_ID(compId, getComponentCount());
1662
1663 auto& io = pimpl_->compIO_[compId - 1];
1664 std::lock_guard<std::mutex> lk(io.mutex);
1665 io.recvCb = std::move(cb);
1666
1667 if (io.recvCb) {
1668 // Flush existing queue using the callback
1669 for (const auto& packet : io.queue)
1670 io.recvCb((uint8_t*) packet.data.data(), packet.data.size());
1671 io.queue.clear();
1672 }
1673}
1674
1675void
1676IceTransport::setOnShutdown(onShutdownCb&& cb)
1677{
1678 pimpl_->scb = cb;
1679}
1680
1681ssize_t
1682IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
1683{
1684 ASSERT_COMP_ID(compId, getComponentCount());
1685
1686 auto remote = getRemoteAddress(compId);
1687
1688 if (!remote) {
1689 if (pimpl_->logger_)
1690 pimpl_->logger_->error("[ice:{}] can't find remote address for component {:d}", fmt::ptr(pimpl_), compId);
1691 errno = EINVAL;
1692 return -1;
1693 }
1694
1695 std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock);
1696 if (isTCPEnabled())
1697 dlk.lock();
1698
1699 jami_tracepoint(ice_transport_send,
1700 reinterpret_cast<uint64_t>(this),
1701 compId,
1702 len,
1703 remote.toString().c_str());
1704
1705 auto status = pj_ice_strans_sendto2(pimpl_->icest_,
1706 compId,
1707 buf,
1708 len,
1709 remote.pjPtr(),
1710 remote.getLength());
1711
1712 jami_tracepoint(ice_transport_send_status, status);
1713
1714 if (status == PJ_EPENDING && isTCPEnabled()) {
1715 // NOTE; because we are in TCP, the sent size will count the header (2
1716 // bytes length).
1717 pimpl_->waitDataCv_.wait(dlk, [&] {
1718 return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_;
1719 });
1720 pimpl_->lastSentLen_ = 0;
1721 } else if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1722 if (status == PJ_EBUSY) {
1723 errno = EAGAIN;
1724 } else {
1725 if (pimpl_->logger_)
1726 pimpl_->logger_->error("[ice:{}] ice send failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1727 errno = EIO;
1728 }
1729 return -1;
1730 }
1731
1732 return len;
1733}
1734
1735bool
1736IceTransport::waitForInitialization(std::chrono::milliseconds timeout)
1737{
1738 return pimpl_->_waitForInitialization(timeout);
1739}
1740
1741ssize_t
1742IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec)
1743{
1744 ASSERT_COMP_ID(compId, getComponentCount());
1745 return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec);
1746}
1747
1748bool
1749IceTransport::isTCPEnabled()
1750{
1751 return pimpl_->isTcpEnabled();
1752}
1753
1754ICESDP
1755IceTransport::parseIceCandidates(std::string_view sdp_msg)
1756{
1757 if (pimpl_->streamsCount_ != 1) {
1758 if (pimpl_->logger_)
1759 pimpl_->logger_->error("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_);
1760 return {};
1761 }
1762
1763 ICESDP res;
1764 int nr = 0;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -04001765 for (std::string_view line; dhtnet::getline(sdp_msg, line); nr++) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001766 if (nr == 0) {
1767 res.rem_ufrag = line;
1768 } else if (nr == 1) {
1769 res.rem_pwd = line;
1770 } else {
1771 IceCandidate cand;
1772 if (parseIceAttributeLine(0, std::string(line), cand)) {
1773 if (pimpl_->logger_)
1774 pimpl_->logger_->debug("[ice:{}] Add remote candidate: {}",
1775 fmt::ptr(pimpl_),
1776 line);
1777 res.rem_candidates.emplace_back(cand);
1778 }
1779 }
1780 }
1781 return res;
1782}
1783
1784void
1785IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr)
1786{
1787 pimpl_->setDefaultRemoteAddress(comp_id, addr);
1788}
1789
1790std::string
1791IceTransport::link() const
1792{
1793 return pimpl_->link();
1794}
1795
1796//==============================================================================
1797
Adrien Béraud89933c12023-07-26 14:53:30 -04001798IceTransportFactory::IceTransportFactory(const std::shared_ptr<Logger>& logger)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001799 : cp_(new pj_caching_pool(),
1800 [](pj_caching_pool* p) {
1801 pj_caching_pool_destroy(p);
1802 delete p;
1803 })
1804 , ice_cfg_()
Adrien Béraud89933c12023-07-26 14:53:30 -04001805 , logger_(logger)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001806{
1807 pj_caching_pool_init(cp_.get(), NULL, 0);
1808
1809 pj_ice_strans_cfg_default(&ice_cfg_);
1810 ice_cfg_.stun_cfg.pf = &cp_->factory;
1811
1812 // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires
1813 // a minimum of 500ms on fixed-line links. Our usual case is wireless links.
1814 // This solves too long ICE exchange by DHT.
1815 // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout.
1816 ice_cfg_.stun_cfg.rto_msec = 500;
1817
1818 // See https://tools.ietf.org/html/rfc5245#section-8.1.1.2
1819 // If enabled, it may help speed-up the connectivity, but may cause
1820 // the nomination of sub-optimal pairs.
1821 ice_cfg_.opt.aggressive = PJ_FALSE;
1822}
1823
1824IceTransportFactory::~IceTransportFactory() {}
1825
1826std::shared_ptr<IceTransport>
1827IceTransportFactory::createTransport(std::string_view name)
1828{
1829 try {
Adrien Béraud89933c12023-07-26 14:53:30 -04001830 return std::make_shared<IceTransport>(name, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001831 } catch (const std::exception& e) {
1832 //JAMI_ERR("%s", e.what());
1833 return nullptr;
1834 }
1835}
1836
1837std::unique_ptr<IceTransport>
1838IceTransportFactory::createUTransport(std::string_view name)
1839{
1840 try {
Adrien Béraud89933c12023-07-26 14:53:30 -04001841 return std::make_unique<IceTransport>(name, logger_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001842 } catch (const std::exception& e) {
1843 //JAMI_ERR("%s", e.what());
1844 return nullptr;
1845 }
1846}
1847
1848//==============================================================================
1849
1850void
1851IceSocket::close()
1852{
1853 if (ice_transport_)
1854 ice_transport_->setOnRecv(compId_, {});
1855 ice_transport_.reset();
1856}
1857
1858ssize_t
1859IceSocket::send(const unsigned char* buf, size_t len)
1860{
1861 if (not ice_transport_)
1862 return -1;
1863 return ice_transport_->send(compId_, buf, len);
1864}
1865
1866ssize_t
1867IceSocket::waitForData(std::chrono::milliseconds timeout)
1868{
1869 if (not ice_transport_)
1870 return -1;
1871
1872 std::error_code ec;
1873 return ice_transport_->waitForData(compId_, timeout, ec);
1874}
1875
1876void
1877IceSocket::setOnRecv(IceRecvCb cb)
1878{
1879 if (ice_transport_)
1880 ice_transport_->setOnRecv(compId_, cb);
1881}
1882
1883uint16_t
1884IceSocket::getTransportOverhead()
1885{
1886 if (not ice_transport_)
1887 return 0;
1888
1889 return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE
1890 : IPV6_HEADER_SIZE;
1891}
1892
1893void
1894IceSocket::setDefaultRemoteAddress(const IpAddr& addr)
1895{
1896 if (ice_transport_)
1897 ice_transport_->setDefaultRemoteAddress(compId_, addr);
1898}
1899
Sébastien Blin464bdff2023-07-19 08:02:53 -04001900} // namespace dhtnet