blob: b47353be4013eaf8cfaaf0cb33f6c1ca961df578 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
3 *
4 * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21#include "ice_transport.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040022#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040023#include "ice_socket.h"
24#include "sip_utils.h"
25#include "string_utils.h"
26#include "upnp/upnp_control.h"
27#include "transport/peer_channel.h"
28#include "tracepoint/tracepoint.h"
29
30#include <opendht/logger.h>
31#include <opendht/utils.h>
32
33#include <pjlib.h>
34
35#include <map>
36#include <atomic>
37#include <queue>
38#include <mutex>
39#include <condition_variable>
40#include <thread>
41#include <utility>
42#include <tuple>
43#include <algorithm>
44#include <sstream>
45#include <chrono>
46#include <thread>
47#include <cerrno>
48
49#include "pj/limits.h"
50
51#define TRY(ret) \
52 do { \
53 if ((ret) != PJ_SUCCESS) \
54 throw std::runtime_error(#ret " failed"); \
55 } while (0)
56
57// Validate that the component ID is within the expected range
58#define ASSERT_COMP_ID(compId, compCount) \
59 do { \
60 if ((compId) == 0 or (compId) > (compCount)) \
61 throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \
62 } while (0)
63
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040064namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040065
66static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
67static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header
68static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header
69static constexpr int MAX_CANDIDATES {32};
70static constexpr int MAX_DESTRUCTION_TIMEOUT {3000};
71static constexpr int HANDLE_EVENT_DURATION {500};
72
73//==============================================================================
74
75using MutexGuard = std::lock_guard<std::mutex>;
76using MutexLock = std::unique_lock<std::mutex>;
77using namespace upnp;
78
79//==============================================================================
80
81class IceLock
82{
83 pj_grp_lock_t* lk_;
84
85public:
86 IceLock(pj_ice_strans* strans)
87 : lk_(pj_ice_strans_get_grp_lock(strans))
88 {
89 lock();
90 }
91
92 ~IceLock() { unlock(); }
93
94 void lock() { if (lk_) pj_grp_lock_acquire(lk_); }
95
96 void unlock() { if (lk_) pj_grp_lock_release(lk_); }
97};
98
99class IceTransport::Impl
100{
101public:
102 Impl(std::string_view name);
103 ~Impl();
104
105 void initIceInstance(const IceTransportOptions& options);
106
107 void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status);
108
109 void onReceiveData(unsigned comp_id, void* pkt, pj_size_t size);
110
111 /**
112 * Set/change transport role as initiator.
113 * Should be called before start method.
114 */
115 bool setInitiatorSession();
116
117 /**
118 * Set/change transport role as slave.
119 * Should be called before start method.
120 */
121 bool setSlaveSession();
122 bool createIceSession(pj_ice_sess_role role);
123
124 void getUFragPwd();
125
126 std::string link() const;
127
128 bool _isInitialized() const;
129 bool _isStarted() const;
130 bool _isRunning() const;
131 bool _isFailed() const;
132 bool _waitForInitialization(std::chrono::milliseconds timeout);
133
134 const pj_ice_sess_cand* getSelectedCandidate(unsigned comp_id, bool remote) const;
135 IpAddr getLocalAddress(unsigned comp_id) const;
136 IpAddr getRemoteAddress(unsigned comp_id) const;
137 static const char* getCandidateType(const pj_ice_sess_cand* cand);
138 bool isTcpEnabled() const { return config_.protocol == PJ_ICE_TP_TCP; }
139 bool addStunConfig(int af);
140 void requestUpnpMappings();
141 bool hasUpnp() const;
142 // Take a list of address pairs (local/public) and add them as
143 // reflexive candidates using STUN config.
144 void addServerReflexiveCandidates(const std::vector<std::pair<IpAddr, IpAddr>>& addrList);
145 // Generate server reflexive candidates using the published (DHT/Account) address
146 std::vector<std::pair<IpAddr, IpAddr>> setupGenericReflexiveCandidates();
147 // Generate server reflexive candidates using UPNP mappings.
148 std::vector<std::pair<IpAddr, IpAddr>> setupUpnpReflexiveCandidates();
149 void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr);
150 IpAddr getDefaultRemoteAddress(unsigned comp_id) const;
151 bool handleEvents(unsigned max_msec);
152 int flushTimerHeapAndIoQueue();
153 int checkEventQueue(int maxEventToPoll);
154
155 std::shared_ptr<dht::log::Logger> logger_ {};
156
157 std::condition_variable_any iceCV_ {};
158
159 std::string sessionName_ {};
160 std::unique_ptr<pj_pool_t, decltype(&pj_pool_release)> pool_ {nullptr, pj_pool_release};
161 bool isTcp_ {false};
162 bool upnpEnabled_ {false};
163 IceTransportCompleteCb on_initdone_cb_ {};
164 IceTransportCompleteCb on_negodone_cb_ {};
165 pj_ice_strans* icest_ {nullptr};
166 unsigned streamsCount_ {0};
167 unsigned compCountPerStream_ {0};
168 unsigned compCount_ {0};
169 std::string local_ufrag_ {};
170 std::string local_pwd_ {};
171 pj_sockaddr remoteAddr_ {};
172 pj_ice_strans_cfg config_ {};
173 //std::string last_errmsg_ {};
174
175 std::atomic_bool is_stopped_ {false};
176
177 struct Packet
178 {
179 Packet(void* pkt, pj_size_t size)
180 : data {reinterpret_cast<char*>(pkt), reinterpret_cast<char*>(pkt) + size}
181 {}
182 std::vector<char> data {};
183 };
184
185 struct ComponentIO
186 {
187 std::mutex mutex;
188 std::condition_variable cv;
189 std::deque<Packet> queue;
190 IceRecvCb recvCb;
191 };
192
193 // NOTE: Component IDs start from 1, while these three vectors
194 // are indexed from 0. Conversion from ID to vector index must
195 // be done properly.
196 std::vector<ComponentIO> compIO_ {};
197 std::vector<PeerChannel> peerChannels_ {};
198 std::vector<IpAddr> iceDefaultRemoteAddr_;
199
200 // ICE controlling role. True for controller agents and false for
201 // controlled agents
202 std::atomic_bool initiatorSession_ {true};
203
204 // Local/Public addresses used by the account owning the ICE instance.
205 IpAddr accountLocalAddr_ {};
206 IpAddr accountPublicAddr_ {};
207
208 // STUN and TURN servers
209 std::vector<StunServerInfo> stunServers_;
210 std::vector<TurnServerInfo> turnServers_;
211
212 /**
213 * Returns the IP of each candidate for a given component in the ICE session
214 */
215 struct LocalCandidate
216 {
217 IpAddr addr;
218 pj_ice_cand_transport transport;
219 };
220
221 std::shared_ptr<upnp::Controller> upnp_ {};
222 std::mutex upnpMutex_ {};
223 std::map<Mapping::key_t, Mapping> upnpMappings_;
224 std::mutex upnpMappingsMutex_ {};
225
226 bool onlyIPv4Private_ {true};
227
228 // IO/Timer events are handled by following thread
229 std::thread thread_ {};
230 std::atomic_bool threadTerminateFlags_ {false};
231
232 // Wait data on components
233 mutable std::mutex sendDataMutex_ {};
234 std::condition_variable waitDataCv_ = {};
235 pj_size_t lastSentLen_ {0};
236 bool destroying_ {false};
237 onShutdownCb scb {};
238
239 void cancelOperations()
240 {
241 for (auto& c : peerChannels_)
242 c.stop();
243 std::lock_guard<std::mutex> lk(sendDataMutex_);
244 destroying_ = true;
245 waitDataCv_.notify_all();
246 }
247};
248
249//==============================================================================
250
251/**
252 * Add stun/turn configuration or default host as candidates
253 */
254
255static void
256add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info)
257{
258 if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN)
259 throw std::runtime_error("Too many STUN configurations");
260
261 IpAddr ip {info.uri};
262
263 // Given URI cannot be DNS resolved or not IPv4 or IPv6?
264 // This prevents a crash into PJSIP when ip.toString() is called.
265 if (ip.getFamily() == AF_UNSPEC) {
266 /*JAMI_DBG("[ice (%s)] STUN server '%s' not used, unresolvable address",
267 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
268 info.uri.c_str());*/
269 return;
270 }
271
272 auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++];
273 pj_ice_strans_stun_cfg_default(&stun);
274 pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str());
275 stun.af = ip.getFamily();
276 if (!(stun.port = ip.getPort()))
277 stun.port = PJ_STUN_PORT;
278 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
279 stun.conn_type = cfg.stun.conn_type;
280 /*JAMI_DBG("[ice (%s)] added stun server '%s', port %u",
281 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
282 pj_strbuf(&stun.server),
283 stun.port);*/
284}
285
286static void
287add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info)
288{
289 if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN)
290 throw std::runtime_error("Too many TURN servers");
291
292 IpAddr ip {info.uri};
293
294 // Same comment as add_stun_server()
295 if (ip.getFamily() == AF_UNSPEC) {
296 /*JAMI_DBG("[ice (%s)] TURN server '%s' not used, unresolvable address",
297 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
298 info.uri.c_str());*/
299 return;
300 }
301
302 auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++];
303 pj_ice_strans_turn_cfg_default(&turn);
304 pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str());
305 turn.af = ip.getFamily();
306 if (!(turn.port = ip.getPort()))
307 turn.port = PJ_STUN_PORT;
308 turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
309 turn.conn_type = cfg.turn.conn_type;
310
311 // Authorization (only static plain password supported yet)
312 if (not info.password.empty()) {
313 turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
314 turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
315 pj_strset(&turn.auth_cred.data.static_cred.realm,
316 (char*) info.realm.c_str(),
317 info.realm.size());
318 pj_strset(&turn.auth_cred.data.static_cred.username,
319 (char*) info.username.c_str(),
320 info.username.size());
321 pj_strset(&turn.auth_cred.data.static_cred.data,
322 (char*) info.password.c_str(),
323 info.password.size());
324 }
325
326 /*JAMI_DBG("[ice (%s)] added turn server '%s', port %u",
327 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
328 pj_strbuf(&turn.server),
329 turn.port);*/
330}
331
332//==============================================================================
333
334IceTransport::Impl::Impl(std::string_view name)
335 : sessionName_(name)
336{
337 if (logger_)
338 logger_->debug("[ice:{}] Creating IceTransport session for \"{:s}\"", fmt::ptr(this), name);
339}
340
341IceTransport::Impl::~Impl()
342{
343 if (logger_)
344 logger_->debug("[ice:{}] destroying {}", fmt::ptr(this), fmt::ptr(icest_));
345
346 threadTerminateFlags_ = true;
347
348 if (thread_.joinable()) {
349 thread_.join();
350 }
351
352 if (icest_) {
353 pj_ice_strans* strans = nullptr;
354
355 std::swap(strans, icest_);
356
357 // must be done before ioqueue/timer destruction
358 if (logger_)
359 logger_->debug("[ice:{}] Destroying ice_strans {}", pj_ice_strans_get_user_data(strans), fmt::ptr(strans));
360
361 pj_ice_strans_stop_ice(strans);
362 pj_ice_strans_destroy(strans);
363
364 // NOTE: This last timer heap and IO queue polling is necessary to close
365 // TURN socket.
366 // Because when destroying the TURN session pjproject creates a pj_timer
367 // to postpone the TURN destruction. This timer is only called if we poll
368 // the event queue.
369
370 int ret = flushTimerHeapAndIoQueue();
371
372 if (ret < 0) {
373 if (logger_)
374 logger_->error("[ice:{}] IO queue polling failed", fmt::ptr(this));
375 } else if (ret > 0) {
376 if (logger_)
377 logger_->error("[ice:{}] Unexpected left timer in timer heap. "
378 "Please report the bug",
379 fmt::ptr(this));
380 }
381
382 if (checkEventQueue(1) > 0) {
383 if (logger_)
384 logger_->warn("[ice:{}] Unexpected left events in IO queue", fmt::ptr(this));
385 }
386
387 if (config_.stun_cfg.ioqueue)
388 pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
389
390 if (config_.stun_cfg.timer_heap)
391 pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
392 }
393
394 if (logger_)
395 logger_->debug("[ice:%p] done destroying", fmt::ptr(this));
396 if (scb)
397 scb();
398}
399
400void
401IceTransport::Impl::initIceInstance(const IceTransportOptions& options)
402{
403 isTcp_ = options.tcpEnable;
404 upnpEnabled_ = options.upnpEnable;
405 on_initdone_cb_ = options.onInitDone;
406 on_negodone_cb_ = options.onNegoDone;
407 streamsCount_ = options.streamsCount;
408 compCountPerStream_ = options.compCountPerStream;
409 compCount_ = streamsCount_ * compCountPerStream_;
410 compIO_ = std::vector<ComponentIO>(compCount_);
411 peerChannels_ = std::vector<PeerChannel>(compCount_);
412 iceDefaultRemoteAddr_.resize(compCount_);
413 initiatorSession_ = options.master;
414 accountLocalAddr_ = std::move(options.accountLocalAddr);
415 accountPublicAddr_ = std::move(options.accountPublicAddr);
416 stunServers_ = std::move(options.stunServers);
417 turnServers_ = std::move(options.turnServers);
418
419 if (logger_)
420 logger_->debug("[ice:{}] Initializing the session - comp count {} - as a {}",
421 fmt::ptr(this),
422 compCount_,
423 initiatorSession_ ? "master" : "slave");
424
425 if (upnpEnabled_)
Adrien Béraud25c30c42023-07-05 13:46:54 -0400426 upnp_.reset(new upnp::Controller(options.upnpContext));
Adrien Béraud612b55b2023-05-29 10:42:04 -0400427
428 config_ = options.factory->getIceCfg(); // config copy
429 if (isTcp_) {
430 config_.protocol = PJ_ICE_TP_TCP;
431 config_.stun.conn_type = PJ_STUN_TP_TCP;
432 config_.turn.conn_type = PJ_TURN_TP_TCP;
433 } else {
434 config_.protocol = PJ_ICE_TP_UDP;
435 config_.stun.conn_type = PJ_STUN_TP_UDP;
436 config_.turn.conn_type = PJ_TURN_TP_UDP;
437 }
438
439 pool_.reset(
440 pj_pool_create(options.factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL));
441 if (not pool_)
442 throw std::runtime_error("pj_pool_create() failed");
443
444 // Note: For server reflexive candidates, UPNP mappings will
445 // be used if available. Then, the public address learnt during
446 // the account registration process will be added only if it
447 // differs from the UPNP public address.
448 // Also note that UPNP candidates should be added first in order
449 // to have a higher priority when performing the connectivity
450 // checks.
451 // STUN configs layout:
452 // - index 0 : host IPv4
453 // - index 1 : host IPv6
454 // - index 2 : upnp/generic srflx IPv4.
455 // - index 3 : generic srflx (if upnp exists and different)
456
457 config_.stun_tp_cnt = 0;
458
459 if (logger_)
460 logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
461 addStunConfig(pj_AF_INET());
462 addStunConfig(pj_AF_INET6());
463
464 std::vector<std::pair<IpAddr, IpAddr>> upnpSrflxCand;
465 if (upnp_) {
466 requestUpnpMappings();
467 upnpSrflxCand = setupUpnpReflexiveCandidates();
468 if (not upnpSrflxCand.empty()) {
469 addServerReflexiveCandidates(upnpSrflxCand);
470 if (logger_)
471 logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
472 }
473 }
474
475 auto genericSrflxCand = setupGenericReflexiveCandidates();
476
477 if (not genericSrflxCand.empty()) {
478 // Generic srflx candidates will be added only if different
479 // from upnp candidates.
480 if (upnpSrflxCand.empty()
481 or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) {
482 addServerReflexiveCandidates(genericSrflxCand);
483 if (logger_)
484 logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
485 }
486 }
487
488 if (upnpSrflxCand.empty() and genericSrflxCand.empty()) {
489 if (logger_)
490 logger_->warn("[ice:{}] No server reflexive candidates added", fmt::ptr(this));
491 }
492
493 pj_ice_strans_cb icecb;
494 pj_bzero(&icecb, sizeof(icecb));
495
496 icecb.on_rx_data = [](pj_ice_strans* ice_st,
497 unsigned comp_id,
498 void* pkt,
499 pj_size_t size,
500 const pj_sockaddr_t* /*src_addr*/,
501 unsigned /*src_addr_len*/) {
502 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
503 tr->onReceiveData(comp_id, pkt, size);
504 };
505
506 icecb.on_ice_complete = [](pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) {
507 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
508 tr->onComplete(ice_st, op, status);
509 };
510
511 if (isTcp_) {
512 icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
513 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
514 std::lock_guard lk(tr->sendDataMutex_);
515 tr->lastSentLen_ += size;
516 tr->waitDataCv_.notify_all();
517 }
518 };
519 }
520
521 icecb.on_destroy = [](pj_ice_strans* ice_st) {
522 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
523 tr->cancelOperations(); // Avoid upper layer to manage this ; Stop read operations
524 };
525
526 // Add STUN servers
527 for (auto& server : stunServers_)
528 add_stun_server(*pool_, config_, server);
529
530 // Add TURN servers
531 for (auto& server : turnServers_)
532 add_turn_server(*pool_, config_, server);
533
534 static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64);
535 TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap));
536 TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue));
537 std::ostringstream sessionName {};
538 // We use the instance pointer as the PJNATH session name in order
539 // to easily identify the logs reported by PJNATH.
540 sessionName << this;
541 pj_status_t status = pj_ice_strans_create(sessionName.str().c_str(),
542 &config_,
543 compCount_,
544 this,
545 &icecb,
546 &icest_);
547
548 if (status != PJ_SUCCESS || icest_ == nullptr) {
549 throw std::runtime_error("pj_ice_strans_create() failed");
550 }
551
552 // Must be created after any potential failure
553 thread_ = std::thread([this] {
554 while (not threadTerminateFlags_) {
555 // NOTE: handleEvents can return false in this case
556 // but here we don't care if there is event or not.
557 handleEvents(HANDLE_EVENT_DURATION);
558 }
559 });
560}
561
562bool
563IceTransport::Impl::_isInitialized() const
564{
565 if (auto *icest = icest_) {
566 auto state = pj_ice_strans_get_state(icest);
567 return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
568 }
569 return false;
570}
571
572bool
573IceTransport::Impl::_isStarted() const
574{
575 if (auto *icest = icest_) {
576 auto state = pj_ice_strans_get_state(icest);
577 return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
578 }
579 return false;
580}
581
582bool
583IceTransport::Impl::_isRunning() const
584{
585 if (auto *icest = icest_) {
586 auto state = pj_ice_strans_get_state(icest);
587 return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
588 }
589 return false;
590}
591
592bool
593IceTransport::Impl::_isFailed() const
594{
595 if (auto *icest = icest_)
596 return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
597 return false;
598}
599
600bool
601IceTransport::Impl::handleEvents(unsigned max_msec)
602{
603 // By tests, never seen more than two events per 500ms
604 static constexpr auto MAX_NET_EVENTS = 2;
605
606 pj_time_val max_timeout = {0, static_cast<long>(max_msec)};
607 pj_time_val timeout = {0, 0};
608 unsigned net_event_count = 0;
609
610 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
611 auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32;
612
613 // timeout limitation
614 if (hasActiveTimer)
615 pj_time_val_normalize(&timeout);
616
617 if (PJ_TIME_VAL_GT(timeout, max_timeout)) {
618 timeout = max_timeout;
619 }
620
621 do {
622 auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
623
624 // timeout
625 if (not n_events)
626 return hasActiveTimer;
627
628 // error
629 if (n_events < 0) {
630 const auto err = pj_get_os_error();
631 // Kept as debug as some errors are "normal" in regular context
632 if (logger_)
633 logger_->debug("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
634 std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
635 return hasActiveTimer;
636 }
637
638 net_event_count += n_events;
639 timeout.sec = timeout.msec = 0;
640 } while (net_event_count < MAX_NET_EVENTS);
641 return hasActiveTimer;
642}
643
644int
645IceTransport::Impl::flushTimerHeapAndIoQueue()
646{
647 pj_time_val timerTimeout = {0, 0};
648 pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION};
649 bool hasActiveTimer = false;
650 std::chrono::milliseconds totalWaitTime {0};
651 auto const start = std::chrono::steady_clock::now();
652 // We try to process pending events as fast as possible to
653 // speed-up the release.
654 int maxEventToProcess = 10;
655
656 do {
657 if (checkEventQueue(maxEventToProcess) < 0)
658 return -1;
659
660 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout);
661 hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32);
662
663 if (hasActiveTimer) {
664 pj_time_val_normalize(&timerTimeout);
665 auto waitTime = std::chrono::milliseconds(
666 std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime)));
667 std::this_thread::sleep_for(waitTime);
668 totalWaitTime += waitTime;
669 }
670 } while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT));
671
672 auto duration = std::chrono::steady_clock::now() - start;
673 if (logger_)
674 logger_->debug("[ice:{}] Timer heap flushed after {}", fmt::ptr(this), dht::print_duration(duration));
675
676 return static_cast<int>(pj_timer_heap_count(config_.stun_cfg.timer_heap));
677}
678
679int
680IceTransport::Impl::checkEventQueue(int maxEventToPoll)
681{
682 pj_time_val timeout = {0, 0};
683 int eventCount = 0;
684 int events = 0;
685
686 do {
687 events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
688 if (events < 0) {
689 const auto err = pj_get_os_error();
690 if (logger_)
691 logger_->error("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
692 return events;
693 }
694
695 eventCount += events;
696
697 } while (events > 0 && eventCount < maxEventToPoll);
698
699 return eventCount;
700}
701
702void
703IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status)
704{
705 const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization"
706 : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation"
707 : "unknown_op";
708
709 const bool done = status == PJ_SUCCESS;
710 if (done) {
711 if (logger_)
712 logger_->debug("[ice:{}] {:s} {:s} success",
713 fmt::ptr(this),
714 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
715 opname);
716 } else {
717 if (logger_)
718 logger_->error("[ice:{}] {:s} {:s} failed: {:s}",
719 fmt::ptr(this),
720 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
721 opname,
722 sip_utils::sip_strerror(status));
723 }
724
725 if (done and op == PJ_ICE_STRANS_OP_INIT) {
726 if (initiatorSession_)
727 setInitiatorSession();
728 else
729 setSlaveSession();
730 }
731
732 if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
733 on_initdone_cb_(done);
734 else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) {
735 if (done) {
736 // Dump of connection pairs
737 if (logger_)
738 logger_->debug("[ice:{}] {:s} connection pairs ([comp id] local [type] <-> remote [type]):\n{:s}",
739 fmt::ptr(this),
740 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
741 link());
742 }
743 if (on_negodone_cb_)
744 on_negodone_cb_(done);
745 }
746
747 iceCV_.notify_all();
748}
749
750std::string
751IceTransport::Impl::link() const
752{
753 std::ostringstream out;
754 for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) {
755 auto absIdx = strm;
756 auto comp = (strm + 1) / compCountPerStream_;
757 auto laddr = getLocalAddress(absIdx);
758 auto raddr = getRemoteAddress(absIdx);
759
760 if (laddr and laddr.getPort() != 0 and raddr and raddr.getPort() != 0) {
761 out << " [" << comp << "] " << laddr.toString(true, true) << " ["
762 << getCandidateType(getSelectedCandidate(absIdx, false)) << "] "
763 << " <-> " << raddr.toString(true, true) << " ["
764 << getCandidateType(getSelectedCandidate(absIdx, true)) << "] " << '\n';
765 } else {
766 out << " [" << comp << "] disabled\n";
767 }
768 }
769 return out.str();
770}
771
772bool
773IceTransport::Impl::setInitiatorSession()
774{
775 if (logger_)
776 logger_->debug("[ice:{}] as master", fmt::ptr(this));
777 initiatorSession_ = true;
778 if (_isInitialized()) {
779 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING);
780 if (status != PJ_SUCCESS) {
781 if (logger_)
782 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
783 return false;
784 }
785 return true;
786 }
787 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING);
788}
789
790bool
791IceTransport::Impl::setSlaveSession()
792{
793 if (logger_)
794 logger_->debug("[ice:{}] as slave", fmt::ptr(this));
795 initiatorSession_ = false;
796 if (_isInitialized()) {
797 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED);
798 if (status != PJ_SUCCESS) {
799 if (logger_)
800 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
801 return false;
802 }
803 return true;
804 }
805 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED);
806}
807
808const pj_ice_sess_cand*
809IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const
810{
811 ASSERT_COMP_ID(comp_id, compCount_);
812
813 // Return the selected candidate pair. Might not be the nominated pair if
814 // ICE has not concluded yet, but should be the nominated pair afterwards.
815 if (not _isRunning()) {
816 if (logger_)
817 logger_->error("[ice:{}] ICE transport is not running", fmt::ptr(this));
818 return nullptr;
819 }
820
821 const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id);
822 if (sess == nullptr) {
823 if (logger_)
824 logger_->warn("[ice:{}] Component {} has no valid pair (disabled)", fmt::ptr(this), comp_id);
825 return nullptr;
826 }
827
828 if (remote)
829 return sess->rcand;
830 else
831 return sess->lcand;
832}
833
834IpAddr
835IceTransport::Impl::getLocalAddress(unsigned comp_id) const
836{
837 ASSERT_COMP_ID(comp_id, compCount_);
838
839 if (auto cand = getSelectedCandidate(comp_id, false))
840 return cand->addr;
841
842 return {};
843}
844
845IpAddr
846IceTransport::Impl::getRemoteAddress(unsigned comp_id) const
847{
848 ASSERT_COMP_ID(comp_id, compCount_);
849
850 if (auto cand = getSelectedCandidate(comp_id, true))
851 return cand->addr;
852
853 return {};
854}
855
856const char*
857IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand)
858{
859 auto name = cand ? pj_ice_get_cand_type_name(cand->type) : nullptr;
860 return name ? name : "?";
861}
862
863void
864IceTransport::Impl::getUFragPwd()
865{
866 if (icest_) {
867 pj_str_t local_ufrag, local_pwd;
868
869 pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr);
870 local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
871 local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
872 }
873}
874
875bool
876IceTransport::Impl::createIceSession(pj_ice_sess_role role)
877{
878 if (not icest_) {
879 return false;
880 }
881
882 if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) {
883 if (logger_)
884 logger_->error("[ice:{}] pj_ice_strans_init_ice() failed", fmt::ptr(this));
885 return false;
886 }
887
888 // Fetch some information on local configuration
889 getUFragPwd();
890
891 if (logger_)
892 logger_->debug("[ice:{}] (local) ufrag=%s, pwd=%s", fmt::ptr(this), local_ufrag_.c_str(), local_pwd_.c_str());
893
894 return true;
895}
896
897bool
898IceTransport::Impl::addStunConfig(int af)
899{
900 if (config_.stun_tp_cnt >= PJ_ICE_MAX_STUN) {
901 if (logger_)
902 logger_->error("Max number of STUN configurations reached (%i)", PJ_ICE_MAX_STUN);
903 return false;
904 }
905
906 if (af != pj_AF_INET() and af != pj_AF_INET6()) {
907 if (logger_)
908 logger_->error("Invalid address familly (%i)", af);
909 return false;
910 }
911
912 auto& stun = config_.stun_tp[config_.stun_tp_cnt++];
913
914 pj_ice_strans_stun_cfg_default(&stun);
915 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
916 stun.af = af;
917 stun.conn_type = config_.stun.conn_type;
918
919 if (logger_)
920 logger_->debug("[ice:{}] added host stun config for {:s} transport",
921 fmt::ptr(this),
922 config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
923
924 return true;
925}
926
927void
928IceTransport::Impl::requestUpnpMappings()
929{
930 // Must be called once !
931
932 std::lock_guard<std::mutex> lock(upnpMutex_);
933
934 if (not upnp_)
935 return;
936
937 auto transport = isTcpEnabled() ? PJ_CAND_TCP_PASSIVE : PJ_CAND_UDP;
938 auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP;
939
940 // Request upnp mapping for each component.
941 for (unsigned id = 1; id <= compCount_; id++) {
942 // Set port number to 0 to get any available port.
943 Mapping requestedMap(portType);
944
945 // Request the mapping
946 Mapping::sharedPtr_t mapPtr = upnp_->reserveMapping(requestedMap);
947
948 // To use a mapping, it must be valid, open and has valid host address.
949 if (mapPtr and mapPtr->getMapKey() and (mapPtr->getState() == MappingState::OPEN)
950 and mapPtr->hasValidHostAddress()) {
951 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
952 auto ret = upnpMappings_.emplace(mapPtr->getMapKey(), *mapPtr);
953 if (ret.second) {
954 if (logger_)
955 logger_->debug("[ice:{}] UPNP mapping {:s} successfully allocated",
956 fmt::ptr(this),
957 mapPtr->toString(true));
958 } else {
959 if (logger_)
960 logger_->warn("[ice:{}] UPNP mapping {:s} already in the list!",
961 fmt::ptr(this),
962 mapPtr->toString());
963 }
964 } else {
965 if (logger_)
966 logger_->warn("[ice:{}] UPNP mapping request failed!", fmt::ptr(this));
967 upnp_->releaseMapping(requestedMap);
968 }
969 }
970}
971
972bool
973IceTransport::Impl::hasUpnp() const
974{
975 return upnp_ and upnpMappings_.size() == compCount_;
976}
977
978void
979IceTransport::Impl::addServerReflexiveCandidates(
980 const std::vector<std::pair<IpAddr, IpAddr>>& addrList)
981{
982 if (addrList.size() != compCount_) {
983 if (logger_)
984 logger_->warn("[ice:{}] Provided addr list size {} does not match component count {}",
985 fmt::ptr(this),
986 addrList.size(),
987 compCount_);
988 return;
989 }
990 if (compCount_ > PJ_ICE_MAX_COMP) {
991 if (logger_)
992 logger_->error("[ice:{}] Too many components", fmt::ptr(this));
993 return;
994 }
995
996 // Add config for server reflexive candidates (UPNP or from DHT).
997 if (not addStunConfig(pj_AF_INET()))
998 return;
999
1000 assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN);
1001 auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1];
1002
1003 for (unsigned id = 1; id <= compCount_; id++) {
1004 auto idx = id - 1;
1005 auto& localAddr = addrList[idx].first;
1006 auto& publicAddr = addrList[idx].second;
1007
1008 if (logger_)
1009 logger_->debug("[ice:{}] Add srflx reflexive candidates [{:s} : {:s}] for comp {:d}",
1010 fmt::ptr(this),
1011 localAddr.toString(true),
1012 publicAddr.toString(true),
1013 id);
1014
1015 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr());
1016 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr());
1017
1018 if (isTcpEnabled()) {
1019 if (publicAddr.getPort() == 9) {
1020 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE;
1021 } else {
1022 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE;
1023 }
1024 } else {
1025 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP;
1026 }
1027 }
1028
1029 stun.cfg.user_mapping_cnt = compCount_;
1030}
1031
1032std::vector<std::pair<IpAddr, IpAddr>>
1033IceTransport::Impl::setupGenericReflexiveCandidates()
1034{
1035 if (not accountLocalAddr_) {
1036 if (logger_)
1037 logger_->warn("[ice:{}] Missing local address, generic srflx candidates wont be generated!",
1038 fmt::ptr(this));
1039 return {};
1040 }
1041
1042 if (not accountPublicAddr_) {
1043 if (logger_)
1044 logger_->warn("[ice:{}] Missing public address, generic srflx candidates wont be generated!",
1045 fmt::ptr(this));
1046 return {};
1047 }
1048
1049 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1050 auto isTcp = isTcpEnabled();
1051
1052 addrList.reserve(compCount_);
1053 for (unsigned id = 1; id <= compCount_; id++) {
1054 // For TCP, the type is set to active, because most likely the incoming
1055 // connection will be blocked by the NAT.
1056 // For UDP use random port number.
1057 uint16_t port = isTcp ? 9
1058 : upnp::Controller::generateRandomPort(isTcp ? PortType::TCP
1059 : PortType::UDP);
1060
1061 accountLocalAddr_.setPort(port);
1062 accountPublicAddr_.setPort(port);
1063 addrList.emplace_back(accountLocalAddr_, accountPublicAddr_);
1064 }
1065
1066 return addrList;
1067}
1068
1069std::vector<std::pair<IpAddr, IpAddr>>
1070IceTransport::Impl::setupUpnpReflexiveCandidates()
1071{
1072 // Add UPNP server reflexive candidates if available.
1073 if (not hasUpnp())
1074 return {};
1075
1076 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
1077
1078 if (upnpMappings_.size() < (size_t)compCount_) {
1079 if (logger_)
1080 logger_->warn("[ice:{}] Not enough mappings {:d}. Expected {:d}",
1081 fmt::ptr(this),
1082 upnpMappings_.size(),
1083 compCount_);
1084 return {};
1085 }
1086
1087 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1088
1089 addrList.reserve(upnpMappings_.size());
1090 for (auto const& [_, map] : upnpMappings_) {
1091 assert(map.getMapKey());
1092 IpAddr localAddr {map.getInternalAddress()};
1093 localAddr.setPort(map.getInternalPort());
1094 IpAddr publicAddr {map.getExternalAddress()};
1095 publicAddr.setPort(map.getExternalPort());
1096 addrList.emplace_back(localAddr, publicAddr);
1097 }
1098
1099 return addrList;
1100}
1101
1102void
1103IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr)
1104{
1105 ASSERT_COMP_ID(compId, compCount_);
1106
1107 iceDefaultRemoteAddr_[compId - 1] = addr;
1108 // The port does not matter. Set it 0 to avoid confusion.
1109 iceDefaultRemoteAddr_[compId - 1].setPort(0);
1110}
1111
1112IpAddr
1113IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const
1114{
1115 ASSERT_COMP_ID(compId, compCount_);
1116 return iceDefaultRemoteAddr_[compId - 1];
1117}
1118
1119void
1120IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size)
1121{
1122 ASSERT_COMP_ID(comp_id, compCount_);
1123
1124 jami_tracepoint_if_enabled(ice_transport_recv,
1125 reinterpret_cast<uint64_t>(this),
1126 comp_id,
1127 size,
1128 getRemoteAddress(comp_id).toString().c_str());
1129 if (size == 0)
1130 return;
1131
1132 {
1133 auto& io = compIO_[comp_id - 1];
1134 std::lock_guard<std::mutex> lk(io.mutex);
1135
1136 if (io.recvCb) {
1137 io.recvCb((uint8_t*) pkt, size);
1138 return;
1139 }
1140 }
1141
1142 std::error_code ec;
1143 auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec);
1144 if (err < 0) {
1145 if (logger_)
1146 logger_->error("[ice:{}] rx: channel is closed", fmt::ptr(this));
1147 }
1148}
1149
1150bool
1151IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout)
1152{
1153 IceLock lk(icest_);
1154
1155 if (not iceCV_.wait_for(lk, timeout, [this] {
1156 return threadTerminateFlags_ or _isInitialized() or _isFailed();
1157 })) {
1158 if (logger_)
1159 logger_->warn("[ice:{}] waitForInitialization: timeout", fmt::ptr(this));
1160 return false;
1161 }
1162
1163 return _isInitialized();
1164}
1165
1166//==============================================================================
1167
1168IceTransport::IceTransport(std::string_view name)
1169 : pimpl_ {std::make_unique<Impl>(name)}
1170{}
1171
1172IceTransport::~IceTransport()
1173{
1174 cancelOperations();
1175}
1176
1177const std::shared_ptr<dht::log::Logger>&
1178IceTransport::logger() const
1179{
1180 return pimpl_->logger_;
1181}
1182
1183void
1184IceTransport::initIceInstance(const IceTransportOptions& options)
1185{
1186 pimpl_->initIceInstance(options);
1187 jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this));
1188}
1189
1190bool
1191IceTransport::isInitialized() const
1192{
1193 IceLock lk(pimpl_->icest_);
1194 return pimpl_->_isInitialized();
1195}
1196
1197bool
1198IceTransport::isStarted() const
1199{
1200 IceLock lk(pimpl_->icest_);
1201 return pimpl_->_isStarted();
1202}
1203
1204bool
1205IceTransport::isRunning() const
1206{
1207 if (!pimpl_->icest_)
1208 return false;
1209 IceLock lk(pimpl_->icest_);
1210 return pimpl_->_isRunning();
1211}
1212
1213bool
1214IceTransport::isFailed() const
1215{
1216 return pimpl_->_isFailed();
1217}
1218
1219unsigned
1220IceTransport::getComponentCount() const
1221{
1222 return pimpl_->compCount_;
1223}
1224
1225bool
1226IceTransport::setSlaveSession()
1227{
1228 return pimpl_->setSlaveSession();
1229}
1230bool
1231IceTransport::setInitiatorSession()
1232{
1233 return pimpl_->setInitiatorSession();
1234}
1235
1236bool
1237IceTransport::isInitiator() const
1238{
1239 if (isInitialized()) {
1240 return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING;
1241 }
1242 return pimpl_->initiatorSession_;
1243}
1244
1245bool
1246IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& rem_candidates)
1247{
1248 if (not isInitialized()) {
1249 if (pimpl_->logger_)
1250 pimpl_->logger_->error("[ice:{}] not initialized transport", fmt::ptr(pimpl_.get()));
1251 pimpl_->is_stopped_ = true;
1252 return false;
1253 }
1254
1255 // pj_ice_strans_start_ice crashes if remote candidates array is empty
1256 if (rem_candidates.empty()) {
1257 if (pimpl_->logger_)
1258 pimpl_->logger_->error("[ice:{}] start failed: no remote candidates", fmt::ptr(pimpl_.get()));
1259 pimpl_->is_stopped_ = true;
1260 return false;
1261 }
1262
1263 auto comp_cnt = std::max(1u, getComponentCount());
1264 if (rem_candidates.size() / comp_cnt > PJ_ICE_ST_MAX_CAND - 1) {
1265 std::vector<IceCandidate> rcands;
1266 rcands.reserve(PJ_ICE_ST_MAX_CAND - 1);
1267 if (pimpl_->logger_)
1268 pimpl_->logger_->warn("[ice:{}] too much candidates detected, trim list.", fmt::ptr(pimpl_.get()));
1269 // Just trim some candidates. To avoid to only take host candidates, iterate
1270 // through the whole list and select some host, some turn and peer reflexives
1271 // It should give at least enough infos to negotiate.
1272 auto maxHosts = 8;
1273 auto maxRelays = PJ_ICE_MAX_TURN;
1274 for (auto& c : rem_candidates) {
1275 if (c.type == PJ_ICE_CAND_TYPE_HOST) {
1276 if (maxHosts == 0)
1277 continue;
1278 maxHosts -= 1;
1279 } else if (c.type == PJ_ICE_CAND_TYPE_RELAYED) {
1280 if (maxRelays == 0)
1281 continue;
1282 maxRelays -= 1;
1283 }
1284 if (rcands.size() == PJ_ICE_ST_MAX_CAND - 1)
1285 break;
1286 rcands.emplace_back(std::move(c));
1287 }
1288 rem_candidates = std::move(rcands);
1289 }
1290
1291 pj_str_t ufrag, pwd;
1292 if (pimpl_->logger_)
1293 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:d} remote candidates)",
1294 fmt::ptr(pimpl_),
1295 rem_candidates.size());
1296
1297 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1298 pj_strset(&ufrag,
1299 (char*) rem_attrs.ufrag.c_str(),
1300 rem_attrs.ufrag.size()),
1301 pj_strset(&pwd,
1302 (char*) rem_attrs.pwd.c_str(),
1303 rem_attrs.pwd.size()),
1304 rem_candidates.size(),
1305 rem_candidates.data());
1306 if (status != PJ_SUCCESS) {
1307 if (pimpl_->logger_)
1308 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_.get()), sip_utils::sip_strerror(status));
1309 pimpl_->is_stopped_ = true;
1310 return false;
1311 }
1312
1313 return true;
1314}
1315
1316bool
1317IceTransport::startIce(const SDP& sdp)
1318{
1319 if (pimpl_->streamsCount_ != 1) {
1320 if (pimpl_->logger_)
Adrien Béraud49b07192023-06-13 20:03:33 -04001321 pimpl_->logger_->error(FMT_STRING("Expected exactly one stream per SDP (found {:d} streams)"), pimpl_->streamsCount_);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001322 return false;
1323 }
1324
1325 if (not isInitialized()) {
1326 if (pimpl_->logger_)
1327 pimpl_->logger_->error(FMT_STRING("[ice:{}] not initialized transport"), fmt::ptr(pimpl_));
1328 pimpl_->is_stopped_ = true;
1329 return false;
1330 }
1331
1332 for (unsigned id = 1; id <= getComponentCount(); id++) {
1333 auto candVec = getLocalCandidates(id);
1334 for (auto const& cand : candVec) {
1335 if (pimpl_->logger_)
1336 pimpl_->logger_->debug("[ice:{}] Using local candidate {:s} for comp {:d}",
1337 fmt::ptr(pimpl_), cand, id);
1338 }
1339 }
1340
1341 if (pimpl_->logger_)
1342 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:u} remote candidates)",
1343 fmt::ptr(pimpl_), sdp.candidates.size());
1344 pj_str_t ufrag, pwd;
1345
1346 std::vector<IceCandidate> rem_candidates;
1347 rem_candidates.reserve(sdp.candidates.size());
1348 IceCandidate cand;
1349 for (const auto& line : sdp.candidates) {
1350 if (parseIceAttributeLine(0, line, cand))
1351 rem_candidates.emplace_back(cand);
1352 }
1353
1354 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1355 pj_strset(&ufrag,
1356 (char*) sdp.ufrag.c_str(),
1357 sdp.ufrag.size()),
1358 pj_strset(&pwd, (char*) sdp.pwd.c_str(), sdp.pwd.size()),
1359 rem_candidates.size(),
1360 rem_candidates.data());
1361 if (status != PJ_SUCCESS) {
1362 if (pimpl_->logger_)
1363 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1364 pimpl_->is_stopped_ = true;
1365 return false;
1366 }
1367
1368 return true;
1369}
1370
1371void
1372IceTransport::cancelOperations()
1373{
1374 pimpl_->cancelOperations();
1375}
1376
1377IpAddr
1378IceTransport::getLocalAddress(unsigned comp_id) const
1379{
1380 return pimpl_->getLocalAddress(comp_id);
1381}
1382
1383IpAddr
1384IceTransport::getRemoteAddress(unsigned comp_id) const
1385{
1386 // Return the default remote address if set.
1387 // Note that the default remote addresses are the addresses
1388 // set in the 'c=' and 'a=rtcp' lines of the received SDP.
1389 // See pj_ice_strans_sendto2() for more details.
1390 if (auto defaultAddr = pimpl_->getDefaultRemoteAddress(comp_id)) {
1391 return defaultAddr;
1392 }
1393
1394 return pimpl_->getRemoteAddress(comp_id);
1395}
1396
1397const IceTransport::Attribute
1398IceTransport::getLocalAttributes() const
1399{
1400 return {pimpl_->local_ufrag_, pimpl_->local_pwd_};
1401}
1402
1403std::vector<std::string>
1404IceTransport::getLocalCandidates(unsigned comp_id) const
1405{
1406 ASSERT_COMP_ID(comp_id, getComponentCount());
1407 std::vector<std::string> res;
1408 pj_ice_sess_cand cand[MAX_CANDIDATES];
1409 unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
1410
1411 if (!isInitialized()) {
1412 return res;
1413 }
1414
1415 if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
1416 if (pimpl_->logger_)
1417 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1418 return res;
1419 }
1420
1421 res.reserve(cand_cnt);
1422 for (unsigned i = 0; i < cand_cnt; ++i) {
1423 /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544)
1424 * candidate-attribute = "candidate" ":" foundation SP component-id
1425 * SP "TCP" SP priority SP connection-address SP port SP cand-type [SP
1426 * rel-addr] [SP rel-port] SP tcp-type-ext
1427 * *(SP extension-att-name SP
1428 * extension-att-value)
1429 *
1430 * tcp-type-ext = "tcptype" SP tcp-type
1431 * tcp-type = "active" / "passive" / "so"
1432 */
1433 char ipaddr[PJ_INET6_ADDRSTRLEN];
1434 std::string tcp_type;
1435 if (cand[i].transport != PJ_CAND_UDP) {
1436 tcp_type += " tcptype";
1437 switch (cand[i].transport) {
1438 case PJ_CAND_TCP_ACTIVE:
1439 tcp_type += " active";
1440 break;
1441 case PJ_CAND_TCP_PASSIVE:
1442 tcp_type += " passive";
1443 break;
1444 case PJ_CAND_TCP_SO:
1445 default:
1446 tcp_type += " so";
1447 break;
1448 }
1449 }
1450 res.emplace_back(
1451 fmt::format("{} {} {} {} {} {} typ {}{}",
1452 sip_utils::as_view(cand[i].foundation),
1453 cand[i].comp_id,
1454 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1455 cand[i].prio,
1456 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1457 pj_sockaddr_get_port(&cand[i].addr),
1458 pj_ice_get_cand_type_name(cand[i].type),
1459 tcp_type));
1460 }
1461
1462 return res;
1463}
1464std::vector<std::string>
1465IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
1466{
1467 ASSERT_COMP_ID(compId, getComponentCount());
1468
1469 std::vector<std::string> res;
1470 pj_ice_sess_cand cand[MAX_CANDIDATES];
1471 unsigned cand_cnt = MAX_CANDIDATES;
1472
1473 if (not isInitialized()) {
1474 return res;
1475 }
1476
1477 // In the implementation, the component IDs are enumerated globally
1478 // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create
1479 // only one pj_ice_strans instance. However, the component IDs are
1480 // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in
1481 // order to be compliant with the spec.
1482
1483 auto globalCompId = streamIdx * 2 + compId;
1484 if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) {
1485 if (pimpl_->logger_)
1486 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1487 return res;
1488 }
1489
1490 res.reserve(cand_cnt);
1491 // Build ICE attributes according to RFC 6544, section 4.5.
1492 for (unsigned i = 0; i < cand_cnt; ++i) {
1493 char ipaddr[PJ_INET6_ADDRSTRLEN];
1494 std::string tcp_type;
1495 if (cand[i].transport != PJ_CAND_UDP) {
1496 tcp_type += " tcptype";
1497 switch (cand[i].transport) {
1498 case PJ_CAND_TCP_ACTIVE:
1499 tcp_type += " active";
1500 break;
1501 case PJ_CAND_TCP_PASSIVE:
1502 tcp_type += " passive";
1503 break;
1504 case PJ_CAND_TCP_SO:
1505 default:
1506 tcp_type += " so";
1507 break;
1508 }
1509 }
1510 res.emplace_back(
1511 fmt::format("{} {} {} {} {} {} typ {}{}",
1512 sip_utils::as_view(cand[i].foundation),
1513 compId,
1514 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1515 cand[i].prio,
1516 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1517 pj_sockaddr_get_port(&cand[i].addr),
1518 pj_ice_get_cand_type_name(cand[i].type),
1519 tcp_type));
1520 }
1521
1522 return res;
1523}
1524
1525bool
1526IceTransport::parseIceAttributeLine(unsigned streamIdx,
1527 const std::string& line,
1528 IceCandidate& cand) const
1529{
1530 // Silently ignore empty lines
1531 if (line.empty())
1532 return false;
1533
1534 if (streamIdx >= pimpl_->streamsCount_) {
1535 throw std::runtime_error(fmt::format("Stream index {:d} is invalid!", streamIdx));
1536 }
1537
1538 int af, cnt;
1539 char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32];
1540 pj_str_t tmpaddr;
1541 unsigned comp_id, prio, port;
1542 pj_status_t status;
1543 pj_bool_t is_tcp = PJ_FALSE;
1544
1545 // Parse ICE attribute line according to RFC-6544 section 4.5.
1546 // TODO/WARNING: There is no fail-safe in case of malformed attributes.
1547 cnt = sscanf(line.c_str(),
1548 "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n",
1549 foundation,
1550 &comp_id,
1551 transport,
1552 &prio,
1553 ipaddr,
1554 &port,
1555 type,
1556 tcp_type);
1557 if (cnt != 7 && cnt != 8) {
1558 if (pimpl_->logger_)
1559 pimpl_->logger_->error("[ice:{}] Invalid ICE candidate line: {:s}", fmt::ptr(pimpl_), line);
1560 return false;
1561 }
1562
1563 if (strcmp(transport, "TCP") == 0) {
1564 is_tcp = PJ_TRUE;
1565 }
1566
1567 pj_bzero(&cand, sizeof(IceCandidate));
1568
1569 if (strcmp(type, "host") == 0)
1570 cand.type = PJ_ICE_CAND_TYPE_HOST;
1571 else if (strcmp(type, "srflx") == 0)
1572 cand.type = PJ_ICE_CAND_TYPE_SRFLX;
1573 else if (strcmp(type, "prflx") == 0)
1574 cand.type = PJ_ICE_CAND_TYPE_PRFLX;
1575 else if (strcmp(type, "relay") == 0)
1576 cand.type = PJ_ICE_CAND_TYPE_RELAYED;
1577 else {
1578 if (pimpl_->logger_)
1579 pimpl_->logger_->warn("[ice:{}] invalid remote candidate type '{:s}'", fmt::ptr(pimpl_), type);
1580 return false;
1581 }
1582
1583 if (is_tcp) {
1584 if (strcmp(tcp_type, "active") == 0)
1585 cand.transport = PJ_CAND_TCP_ACTIVE;
1586 else if (strcmp(tcp_type, "passive") == 0)
1587 cand.transport = PJ_CAND_TCP_PASSIVE;
1588 else if (strcmp(tcp_type, "so") == 0)
1589 cand.transport = PJ_CAND_TCP_SO;
1590 else {
1591 if (pimpl_->logger_)
1592 pimpl_->logger_->warn("[ice:{}] invalid transport type type '{:s}'", fmt::ptr(pimpl_), tcp_type);
1593 return false;
1594 }
1595 } else {
1596 cand.transport = PJ_CAND_UDP;
1597 }
1598
1599 // If the component Id is enumerated relative to media, convert
1600 // it to absolute enumeration.
1601 if (comp_id <= pimpl_->compCountPerStream_) {
1602 comp_id += pimpl_->compCountPerStream_ * streamIdx;
1603 }
1604 cand.comp_id = (pj_uint8_t) comp_id;
1605
1606 cand.prio = prio;
1607
1608 if (strchr(ipaddr, ':'))
1609 af = pj_AF_INET6();
1610 else {
1611 af = pj_AF_INET();
1612 pimpl_->onlyIPv4Private_ &= IpAddr(ipaddr).isPrivate();
1613 }
1614
1615 tmpaddr = pj_str(ipaddr);
1616 pj_sockaddr_init(af, &cand.addr, NULL, 0);
1617 status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr);
1618 if (status != PJ_SUCCESS) {
1619 if (pimpl_->logger_)
1620 pimpl_->logger_->warn("[ice:{}] invalid IP address '{:s}'", fmt::ptr(pimpl_), ipaddr);
1621 return false;
1622 }
1623
1624 pj_sockaddr_set_port(&cand.addr, (pj_uint16_t) port);
1625 pj_strdup2(pimpl_->pool_.get(), &cand.foundation, foundation);
1626
1627 return true;
1628}
1629
1630ssize_t
1631IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec)
1632{
1633 ASSERT_COMP_ID(compId, getComponentCount());
1634 auto& io = pimpl_->compIO_[compId - 1];
1635 std::lock_guard<std::mutex> lk(io.mutex);
1636
1637 if (io.queue.empty()) {
1638 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
1639 return -1;
1640 }
1641
1642 auto& packet = io.queue.front();
1643 const auto count = std::min(len, packet.data.size());
1644 std::copy_n(packet.data.begin(), count, buf);
1645 if (count == packet.data.size()) {
1646 io.queue.pop_front();
1647 } else {
1648 packet.data.erase(packet.data.begin(), packet.data.begin() + count);
1649 }
1650
1651 ec.clear();
1652 return count;
1653}
1654
1655ssize_t
1656IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec)
1657{
1658 ASSERT_COMP_ID(compId, getComponentCount());
1659 return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec);
1660}
1661
1662void
1663IceTransport::setOnRecv(unsigned compId, IceRecvCb cb)
1664{
1665 ASSERT_COMP_ID(compId, getComponentCount());
1666
1667 auto& io = pimpl_->compIO_[compId - 1];
1668 std::lock_guard<std::mutex> lk(io.mutex);
1669 io.recvCb = std::move(cb);
1670
1671 if (io.recvCb) {
1672 // Flush existing queue using the callback
1673 for (const auto& packet : io.queue)
1674 io.recvCb((uint8_t*) packet.data.data(), packet.data.size());
1675 io.queue.clear();
1676 }
1677}
1678
1679void
1680IceTransport::setOnShutdown(onShutdownCb&& cb)
1681{
1682 pimpl_->scb = cb;
1683}
1684
1685ssize_t
1686IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
1687{
1688 ASSERT_COMP_ID(compId, getComponentCount());
1689
1690 auto remote = getRemoteAddress(compId);
1691
1692 if (!remote) {
1693 if (pimpl_->logger_)
1694 pimpl_->logger_->error("[ice:{}] can't find remote address for component {:d}", fmt::ptr(pimpl_), compId);
1695 errno = EINVAL;
1696 return -1;
1697 }
1698
1699 std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock);
1700 if (isTCPEnabled())
1701 dlk.lock();
1702
1703 jami_tracepoint(ice_transport_send,
1704 reinterpret_cast<uint64_t>(this),
1705 compId,
1706 len,
1707 remote.toString().c_str());
1708
1709 auto status = pj_ice_strans_sendto2(pimpl_->icest_,
1710 compId,
1711 buf,
1712 len,
1713 remote.pjPtr(),
1714 remote.getLength());
1715
1716 jami_tracepoint(ice_transport_send_status, status);
1717
1718 if (status == PJ_EPENDING && isTCPEnabled()) {
1719 // NOTE; because we are in TCP, the sent size will count the header (2
1720 // bytes length).
1721 pimpl_->waitDataCv_.wait(dlk, [&] {
1722 return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_;
1723 });
1724 pimpl_->lastSentLen_ = 0;
1725 } else if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1726 if (status == PJ_EBUSY) {
1727 errno = EAGAIN;
1728 } else {
1729 if (pimpl_->logger_)
1730 pimpl_->logger_->error("[ice:{}] ice send failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1731 errno = EIO;
1732 }
1733 return -1;
1734 }
1735
1736 return len;
1737}
1738
1739bool
1740IceTransport::waitForInitialization(std::chrono::milliseconds timeout)
1741{
1742 return pimpl_->_waitForInitialization(timeout);
1743}
1744
1745ssize_t
1746IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec)
1747{
1748 ASSERT_COMP_ID(compId, getComponentCount());
1749 return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec);
1750}
1751
1752bool
1753IceTransport::isTCPEnabled()
1754{
1755 return pimpl_->isTcpEnabled();
1756}
1757
1758ICESDP
1759IceTransport::parseIceCandidates(std::string_view sdp_msg)
1760{
1761 if (pimpl_->streamsCount_ != 1) {
1762 if (pimpl_->logger_)
1763 pimpl_->logger_->error("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_);
1764 return {};
1765 }
1766
1767 ICESDP res;
1768 int nr = 0;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -04001769 for (std::string_view line; dhtnet::getline(sdp_msg, line); nr++) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001770 if (nr == 0) {
1771 res.rem_ufrag = line;
1772 } else if (nr == 1) {
1773 res.rem_pwd = line;
1774 } else {
1775 IceCandidate cand;
1776 if (parseIceAttributeLine(0, std::string(line), cand)) {
1777 if (pimpl_->logger_)
1778 pimpl_->logger_->debug("[ice:{}] Add remote candidate: {}",
1779 fmt::ptr(pimpl_),
1780 line);
1781 res.rem_candidates.emplace_back(cand);
1782 }
1783 }
1784 }
1785 return res;
1786}
1787
1788void
1789IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr)
1790{
1791 pimpl_->setDefaultRemoteAddress(comp_id, addr);
1792}
1793
1794std::string
1795IceTransport::link() const
1796{
1797 return pimpl_->link();
1798}
1799
1800//==============================================================================
1801
1802IceTransportFactory::IceTransportFactory()
1803 : cp_(new pj_caching_pool(),
1804 [](pj_caching_pool* p) {
1805 pj_caching_pool_destroy(p);
1806 delete p;
1807 })
1808 , ice_cfg_()
1809{
1810 pj_caching_pool_init(cp_.get(), NULL, 0);
1811
1812 pj_ice_strans_cfg_default(&ice_cfg_);
1813 ice_cfg_.stun_cfg.pf = &cp_->factory;
1814
1815 // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires
1816 // a minimum of 500ms on fixed-line links. Our usual case is wireless links.
1817 // This solves too long ICE exchange by DHT.
1818 // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout.
1819 ice_cfg_.stun_cfg.rto_msec = 500;
1820
1821 // See https://tools.ietf.org/html/rfc5245#section-8.1.1.2
1822 // If enabled, it may help speed-up the connectivity, but may cause
1823 // the nomination of sub-optimal pairs.
1824 ice_cfg_.opt.aggressive = PJ_FALSE;
1825}
1826
1827IceTransportFactory::~IceTransportFactory() {}
1828
1829std::shared_ptr<IceTransport>
1830IceTransportFactory::createTransport(std::string_view name)
1831{
1832 try {
1833 return std::make_shared<IceTransport>(name);
1834 } catch (const std::exception& e) {
1835 //JAMI_ERR("%s", e.what());
1836 return nullptr;
1837 }
1838}
1839
1840std::unique_ptr<IceTransport>
1841IceTransportFactory::createUTransport(std::string_view name)
1842{
1843 try {
1844 return std::make_unique<IceTransport>(name);
1845 } catch (const std::exception& e) {
1846 //JAMI_ERR("%s", e.what());
1847 return nullptr;
1848 }
1849}
1850
1851//==============================================================================
1852
1853void
1854IceSocket::close()
1855{
1856 if (ice_transport_)
1857 ice_transport_->setOnRecv(compId_, {});
1858 ice_transport_.reset();
1859}
1860
1861ssize_t
1862IceSocket::send(const unsigned char* buf, size_t len)
1863{
1864 if (not ice_transport_)
1865 return -1;
1866 return ice_transport_->send(compId_, buf, len);
1867}
1868
1869ssize_t
1870IceSocket::waitForData(std::chrono::milliseconds timeout)
1871{
1872 if (not ice_transport_)
1873 return -1;
1874
1875 std::error_code ec;
1876 return ice_transport_->waitForData(compId_, timeout, ec);
1877}
1878
1879void
1880IceSocket::setOnRecv(IceRecvCb cb)
1881{
1882 if (ice_transport_)
1883 ice_transport_->setOnRecv(compId_, cb);
1884}
1885
1886uint16_t
1887IceSocket::getTransportOverhead()
1888{
1889 if (not ice_transport_)
1890 return 0;
1891
1892 return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE
1893 : IPV6_HEADER_SIZE;
1894}
1895
1896void
1897IceSocket::setDefaultRemoteAddress(const IpAddr& addr)
1898{
1899 if (ice_transport_)
1900 ice_transport_->setDefaultRemoteAddress(compId_, addr);
1901}
1902
1903} // namespace jami