blob: 12c01227bf1cecb6f43b0a55f3b7a6f9dcf02cd9 [file] [log] [blame]
Adrien BĂ©raud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
3 *
4 * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21#include "ice_transport.h"
22#include "ice_socket.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25#include "upnp/upnp_control.h"
26#include "transport/peer_channel.h"
27#include "tracepoint/tracepoint.h"
28
29#include <opendht/logger.h>
30#include <opendht/utils.h>
31
32#include <pjlib.h>
33
34#include <map>
35#include <atomic>
36#include <queue>
37#include <mutex>
38#include <condition_variable>
39#include <thread>
40#include <utility>
41#include <tuple>
42#include <algorithm>
43#include <sstream>
44#include <chrono>
45#include <thread>
46#include <cerrno>
47
48#include "pj/limits.h"
49
50#define TRY(ret) \
51 do { \
52 if ((ret) != PJ_SUCCESS) \
53 throw std::runtime_error(#ret " failed"); \
54 } while (0)
55
56// Validate that the component ID is within the expected range
57#define ASSERT_COMP_ID(compId, compCount) \
58 do { \
59 if ((compId) == 0 or (compId) > (compCount)) \
60 throw std::runtime_error("Invalid component ID " + (std::to_string(compId))); \
61 } while (0)
62
63namespace jami {
64
65static constexpr unsigned STUN_MAX_PACKET_SIZE {8192};
66static constexpr uint16_t IPV6_HEADER_SIZE = 40; ///< Size in bytes of IPV6 packet header
67static constexpr uint16_t IPV4_HEADER_SIZE = 20; ///< Size in bytes of IPV4 packet header
68static constexpr int MAX_CANDIDATES {32};
69static constexpr int MAX_DESTRUCTION_TIMEOUT {3000};
70static constexpr int HANDLE_EVENT_DURATION {500};
71
72//==============================================================================
73
74using MutexGuard = std::lock_guard<std::mutex>;
75using MutexLock = std::unique_lock<std::mutex>;
76using namespace upnp;
77
78//==============================================================================
79
80class IceLock
81{
82 pj_grp_lock_t* lk_;
83
84public:
85 IceLock(pj_ice_strans* strans)
86 : lk_(pj_ice_strans_get_grp_lock(strans))
87 {
88 lock();
89 }
90
91 ~IceLock() { unlock(); }
92
93 void lock() { if (lk_) pj_grp_lock_acquire(lk_); }
94
95 void unlock() { if (lk_) pj_grp_lock_release(lk_); }
96};
97
98class IceTransport::Impl
99{
100public:
101 Impl(std::string_view name);
102 ~Impl();
103
104 void initIceInstance(const IceTransportOptions& options);
105
106 void onComplete(pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status);
107
108 void onReceiveData(unsigned comp_id, void* pkt, pj_size_t size);
109
110 /**
111 * Set/change transport role as initiator.
112 * Should be called before start method.
113 */
114 bool setInitiatorSession();
115
116 /**
117 * Set/change transport role as slave.
118 * Should be called before start method.
119 */
120 bool setSlaveSession();
121 bool createIceSession(pj_ice_sess_role role);
122
123 void getUFragPwd();
124
125 std::string link() const;
126
127 bool _isInitialized() const;
128 bool _isStarted() const;
129 bool _isRunning() const;
130 bool _isFailed() const;
131 bool _waitForInitialization(std::chrono::milliseconds timeout);
132
133 const pj_ice_sess_cand* getSelectedCandidate(unsigned comp_id, bool remote) const;
134 IpAddr getLocalAddress(unsigned comp_id) const;
135 IpAddr getRemoteAddress(unsigned comp_id) const;
136 static const char* getCandidateType(const pj_ice_sess_cand* cand);
137 bool isTcpEnabled() const { return config_.protocol == PJ_ICE_TP_TCP; }
138 bool addStunConfig(int af);
139 void requestUpnpMappings();
140 bool hasUpnp() const;
141 // Take a list of address pairs (local/public) and add them as
142 // reflexive candidates using STUN config.
143 void addServerReflexiveCandidates(const std::vector<std::pair<IpAddr, IpAddr>>& addrList);
144 // Generate server reflexive candidates using the published (DHT/Account) address
145 std::vector<std::pair<IpAddr, IpAddr>> setupGenericReflexiveCandidates();
146 // Generate server reflexive candidates using UPNP mappings.
147 std::vector<std::pair<IpAddr, IpAddr>> setupUpnpReflexiveCandidates();
148 void setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr);
149 IpAddr getDefaultRemoteAddress(unsigned comp_id) const;
150 bool handleEvents(unsigned max_msec);
151 int flushTimerHeapAndIoQueue();
152 int checkEventQueue(int maxEventToPoll);
153
154 std::shared_ptr<dht::log::Logger> logger_ {};
155
156 std::condition_variable_any iceCV_ {};
157
158 std::string sessionName_ {};
159 std::unique_ptr<pj_pool_t, decltype(&pj_pool_release)> pool_ {nullptr, pj_pool_release};
160 bool isTcp_ {false};
161 bool upnpEnabled_ {false};
162 IceTransportCompleteCb on_initdone_cb_ {};
163 IceTransportCompleteCb on_negodone_cb_ {};
164 pj_ice_strans* icest_ {nullptr};
165 unsigned streamsCount_ {0};
166 unsigned compCountPerStream_ {0};
167 unsigned compCount_ {0};
168 std::string local_ufrag_ {};
169 std::string local_pwd_ {};
170 pj_sockaddr remoteAddr_ {};
171 pj_ice_strans_cfg config_ {};
172 //std::string last_errmsg_ {};
173
174 std::atomic_bool is_stopped_ {false};
175
176 struct Packet
177 {
178 Packet(void* pkt, pj_size_t size)
179 : data {reinterpret_cast<char*>(pkt), reinterpret_cast<char*>(pkt) + size}
180 {}
181 std::vector<char> data {};
182 };
183
184 struct ComponentIO
185 {
186 std::mutex mutex;
187 std::condition_variable cv;
188 std::deque<Packet> queue;
189 IceRecvCb recvCb;
190 };
191
192 // NOTE: Component IDs start from 1, while these three vectors
193 // are indexed from 0. Conversion from ID to vector index must
194 // be done properly.
195 std::vector<ComponentIO> compIO_ {};
196 std::vector<PeerChannel> peerChannels_ {};
197 std::vector<IpAddr> iceDefaultRemoteAddr_;
198
199 // ICE controlling role. True for controller agents and false for
200 // controlled agents
201 std::atomic_bool initiatorSession_ {true};
202
203 // Local/Public addresses used by the account owning the ICE instance.
204 IpAddr accountLocalAddr_ {};
205 IpAddr accountPublicAddr_ {};
206
207 // STUN and TURN servers
208 std::vector<StunServerInfo> stunServers_;
209 std::vector<TurnServerInfo> turnServers_;
210
211 /**
212 * Returns the IP of each candidate for a given component in the ICE session
213 */
214 struct LocalCandidate
215 {
216 IpAddr addr;
217 pj_ice_cand_transport transport;
218 };
219
220 std::shared_ptr<upnp::Controller> upnp_ {};
221 std::mutex upnpMutex_ {};
222 std::map<Mapping::key_t, Mapping> upnpMappings_;
223 std::mutex upnpMappingsMutex_ {};
224
225 bool onlyIPv4Private_ {true};
226
227 // IO/Timer events are handled by following thread
228 std::thread thread_ {};
229 std::atomic_bool threadTerminateFlags_ {false};
230
231 // Wait data on components
232 mutable std::mutex sendDataMutex_ {};
233 std::condition_variable waitDataCv_ = {};
234 pj_size_t lastSentLen_ {0};
235 bool destroying_ {false};
236 onShutdownCb scb {};
237
238 void cancelOperations()
239 {
240 for (auto& c : peerChannels_)
241 c.stop();
242 std::lock_guard<std::mutex> lk(sendDataMutex_);
243 destroying_ = true;
244 waitDataCv_.notify_all();
245 }
246};
247
248//==============================================================================
249
250/**
251 * Add stun/turn configuration or default host as candidates
252 */
253
254static void
255add_stun_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const StunServerInfo& info)
256{
257 if (cfg.stun_tp_cnt >= PJ_ICE_MAX_STUN)
258 throw std::runtime_error("Too many STUN configurations");
259
260 IpAddr ip {info.uri};
261
262 // Given URI cannot be DNS resolved or not IPv4 or IPv6?
263 // This prevents a crash into PJSIP when ip.toString() is called.
264 if (ip.getFamily() == AF_UNSPEC) {
265 /*JAMI_DBG("[ice (%s)] STUN server '%s' not used, unresolvable address",
266 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
267 info.uri.c_str());*/
268 return;
269 }
270
271 auto& stun = cfg.stun_tp[cfg.stun_tp_cnt++];
272 pj_ice_strans_stun_cfg_default(&stun);
273 pj_strdup2_with_null(&pool, &stun.server, ip.toString().c_str());
274 stun.af = ip.getFamily();
275 if (!(stun.port = ip.getPort()))
276 stun.port = PJ_STUN_PORT;
277 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
278 stun.conn_type = cfg.stun.conn_type;
279 /*JAMI_DBG("[ice (%s)] added stun server '%s', port %u",
280 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
281 pj_strbuf(&stun.server),
282 stun.port);*/
283}
284
285static void
286add_turn_server(pj_pool_t& pool, pj_ice_strans_cfg& cfg, const TurnServerInfo& info)
287{
288 if (cfg.turn_tp_cnt >= PJ_ICE_MAX_TURN)
289 throw std::runtime_error("Too many TURN servers");
290
291 IpAddr ip {info.uri};
292
293 // Same comment as add_stun_server()
294 if (ip.getFamily() == AF_UNSPEC) {
295 /*JAMI_DBG("[ice (%s)] TURN server '%s' not used, unresolvable address",
296 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
297 info.uri.c_str());*/
298 return;
299 }
300
301 auto& turn = cfg.turn_tp[cfg.turn_tp_cnt++];
302 pj_ice_strans_turn_cfg_default(&turn);
303 pj_strdup2_with_null(&pool, &turn.server, ip.toString().c_str());
304 turn.af = ip.getFamily();
305 if (!(turn.port = ip.getPort()))
306 turn.port = PJ_STUN_PORT;
307 turn.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
308 turn.conn_type = cfg.turn.conn_type;
309
310 // Authorization (only static plain password supported yet)
311 if (not info.password.empty()) {
312 turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
313 turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
314 pj_strset(&turn.auth_cred.data.static_cred.realm,
315 (char*) info.realm.c_str(),
316 info.realm.size());
317 pj_strset(&turn.auth_cred.data.static_cred.username,
318 (char*) info.username.c_str(),
319 info.username.size());
320 pj_strset(&turn.auth_cred.data.static_cred.data,
321 (char*) info.password.c_str(),
322 info.password.size());
323 }
324
325 /*JAMI_DBG("[ice (%s)] added turn server '%s', port %u",
326 (cfg.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
327 pj_strbuf(&turn.server),
328 turn.port);*/
329}
330
331//==============================================================================
332
333IceTransport::Impl::Impl(std::string_view name)
334 : sessionName_(name)
335{
336 if (logger_)
337 logger_->debug("[ice:{}] Creating IceTransport session for \"{:s}\"", fmt::ptr(this), name);
338}
339
340IceTransport::Impl::~Impl()
341{
342 if (logger_)
343 logger_->debug("[ice:{}] destroying {}", fmt::ptr(this), fmt::ptr(icest_));
344
345 threadTerminateFlags_ = true;
346
347 if (thread_.joinable()) {
348 thread_.join();
349 }
350
351 if (icest_) {
352 pj_ice_strans* strans = nullptr;
353
354 std::swap(strans, icest_);
355
356 // must be done before ioqueue/timer destruction
357 if (logger_)
358 logger_->debug("[ice:{}] Destroying ice_strans {}", pj_ice_strans_get_user_data(strans), fmt::ptr(strans));
359
360 pj_ice_strans_stop_ice(strans);
361 pj_ice_strans_destroy(strans);
362
363 // NOTE: This last timer heap and IO queue polling is necessary to close
364 // TURN socket.
365 // Because when destroying the TURN session pjproject creates a pj_timer
366 // to postpone the TURN destruction. This timer is only called if we poll
367 // the event queue.
368
369 int ret = flushTimerHeapAndIoQueue();
370
371 if (ret < 0) {
372 if (logger_)
373 logger_->error("[ice:{}] IO queue polling failed", fmt::ptr(this));
374 } else if (ret > 0) {
375 if (logger_)
376 logger_->error("[ice:{}] Unexpected left timer in timer heap. "
377 "Please report the bug",
378 fmt::ptr(this));
379 }
380
381 if (checkEventQueue(1) > 0) {
382 if (logger_)
383 logger_->warn("[ice:{}] Unexpected left events in IO queue", fmt::ptr(this));
384 }
385
386 if (config_.stun_cfg.ioqueue)
387 pj_ioqueue_destroy(config_.stun_cfg.ioqueue);
388
389 if (config_.stun_cfg.timer_heap)
390 pj_timer_heap_destroy(config_.stun_cfg.timer_heap);
391 }
392
393 if (logger_)
394 logger_->debug("[ice:%p] done destroying", fmt::ptr(this));
395 if (scb)
396 scb();
397}
398
399void
400IceTransport::Impl::initIceInstance(const IceTransportOptions& options)
401{
402 isTcp_ = options.tcpEnable;
403 upnpEnabled_ = options.upnpEnable;
404 on_initdone_cb_ = options.onInitDone;
405 on_negodone_cb_ = options.onNegoDone;
406 streamsCount_ = options.streamsCount;
407 compCountPerStream_ = options.compCountPerStream;
408 compCount_ = streamsCount_ * compCountPerStream_;
409 compIO_ = std::vector<ComponentIO>(compCount_);
410 peerChannels_ = std::vector<PeerChannel>(compCount_);
411 iceDefaultRemoteAddr_.resize(compCount_);
412 initiatorSession_ = options.master;
413 accountLocalAddr_ = std::move(options.accountLocalAddr);
414 accountPublicAddr_ = std::move(options.accountPublicAddr);
415 stunServers_ = std::move(options.stunServers);
416 turnServers_ = std::move(options.turnServers);
417
418 if (logger_)
419 logger_->debug("[ice:{}] Initializing the session - comp count {} - as a {}",
420 fmt::ptr(this),
421 compCount_,
422 initiatorSession_ ? "master" : "slave");
423
424 if (upnpEnabled_)
425 upnp_.reset(new upnp::Controller());
426
427 config_ = options.factory->getIceCfg(); // config copy
428 if (isTcp_) {
429 config_.protocol = PJ_ICE_TP_TCP;
430 config_.stun.conn_type = PJ_STUN_TP_TCP;
431 config_.turn.conn_type = PJ_TURN_TP_TCP;
432 } else {
433 config_.protocol = PJ_ICE_TP_UDP;
434 config_.stun.conn_type = PJ_STUN_TP_UDP;
435 config_.turn.conn_type = PJ_TURN_TP_UDP;
436 }
437
438 pool_.reset(
439 pj_pool_create(options.factory->getPoolFactory(), "IceTransport.pool", 512, 512, NULL));
440 if (not pool_)
441 throw std::runtime_error("pj_pool_create() failed");
442
443 // Note: For server reflexive candidates, UPNP mappings will
444 // be used if available. Then, the public address learnt during
445 // the account registration process will be added only if it
446 // differs from the UPNP public address.
447 // Also note that UPNP candidates should be added first in order
448 // to have a higher priority when performing the connectivity
449 // checks.
450 // STUN configs layout:
451 // - index 0 : host IPv4
452 // - index 1 : host IPv6
453 // - index 2 : upnp/generic srflx IPv4.
454 // - index 3 : generic srflx (if upnp exists and different)
455
456 config_.stun_tp_cnt = 0;
457
458 if (logger_)
459 logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
460 addStunConfig(pj_AF_INET());
461 addStunConfig(pj_AF_INET6());
462
463 std::vector<std::pair<IpAddr, IpAddr>> upnpSrflxCand;
464 if (upnp_) {
465 requestUpnpMappings();
466 upnpSrflxCand = setupUpnpReflexiveCandidates();
467 if (not upnpSrflxCand.empty()) {
468 addServerReflexiveCandidates(upnpSrflxCand);
469 if (logger_)
470 logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
471 }
472 }
473
474 auto genericSrflxCand = setupGenericReflexiveCandidates();
475
476 if (not genericSrflxCand.empty()) {
477 // Generic srflx candidates will be added only if different
478 // from upnp candidates.
479 if (upnpSrflxCand.empty()
480 or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) {
481 addServerReflexiveCandidates(genericSrflxCand);
482 if (logger_)
483 logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
484 }
485 }
486
487 if (upnpSrflxCand.empty() and genericSrflxCand.empty()) {
488 if (logger_)
489 logger_->warn("[ice:{}] No server reflexive candidates added", fmt::ptr(this));
490 }
491
492 pj_ice_strans_cb icecb;
493 pj_bzero(&icecb, sizeof(icecb));
494
495 icecb.on_rx_data = [](pj_ice_strans* ice_st,
496 unsigned comp_id,
497 void* pkt,
498 pj_size_t size,
499 const pj_sockaddr_t* /*src_addr*/,
500 unsigned /*src_addr_len*/) {
501 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
502 tr->onReceiveData(comp_id, pkt, size);
503 };
504
505 icecb.on_ice_complete = [](pj_ice_strans* ice_st, pj_ice_strans_op op, pj_status_t status) {
506 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
507 tr->onComplete(ice_st, op, status);
508 };
509
510 if (isTcp_) {
511 icecb.on_data_sent = [](pj_ice_strans* ice_st, pj_ssize_t size) {
512 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st))) {
513 std::lock_guard lk(tr->sendDataMutex_);
514 tr->lastSentLen_ += size;
515 tr->waitDataCv_.notify_all();
516 }
517 };
518 }
519
520 icecb.on_destroy = [](pj_ice_strans* ice_st) {
521 if (auto* tr = static_cast<Impl*>(pj_ice_strans_get_user_data(ice_st)))
522 tr->cancelOperations(); // Avoid upper layer to manage this ; Stop read operations
523 };
524
525 // Add STUN servers
526 for (auto& server : stunServers_)
527 add_stun_server(*pool_, config_, server);
528
529 // Add TURN servers
530 for (auto& server : turnServers_)
531 add_turn_server(*pool_, config_, server);
532
533 static constexpr auto IOQUEUE_MAX_HANDLES = std::min(PJ_IOQUEUE_MAX_HANDLES, 64);
534 TRY(pj_timer_heap_create(pool_.get(), 100, &config_.stun_cfg.timer_heap));
535 TRY(pj_ioqueue_create(pool_.get(), IOQUEUE_MAX_HANDLES, &config_.stun_cfg.ioqueue));
536 std::ostringstream sessionName {};
537 // We use the instance pointer as the PJNATH session name in order
538 // to easily identify the logs reported by PJNATH.
539 sessionName << this;
540 pj_status_t status = pj_ice_strans_create(sessionName.str().c_str(),
541 &config_,
542 compCount_,
543 this,
544 &icecb,
545 &icest_);
546
547 if (status != PJ_SUCCESS || icest_ == nullptr) {
548 throw std::runtime_error("pj_ice_strans_create() failed");
549 }
550
551 // Must be created after any potential failure
552 thread_ = std::thread([this] {
553 while (not threadTerminateFlags_) {
554 // NOTE: handleEvents can return false in this case
555 // but here we don't care if there is event or not.
556 handleEvents(HANDLE_EVENT_DURATION);
557 }
558 });
559}
560
561bool
562IceTransport::Impl::_isInitialized() const
563{
564 if (auto *icest = icest_) {
565 auto state = pj_ice_strans_get_state(icest);
566 return state >= PJ_ICE_STRANS_STATE_SESS_READY and state != PJ_ICE_STRANS_STATE_FAILED;
567 }
568 return false;
569}
570
571bool
572IceTransport::Impl::_isStarted() const
573{
574 if (auto *icest = icest_) {
575 auto state = pj_ice_strans_get_state(icest);
576 return state >= PJ_ICE_STRANS_STATE_NEGO and state != PJ_ICE_STRANS_STATE_FAILED;
577 }
578 return false;
579}
580
581bool
582IceTransport::Impl::_isRunning() const
583{
584 if (auto *icest = icest_) {
585 auto state = pj_ice_strans_get_state(icest);
586 return state >= PJ_ICE_STRANS_STATE_RUNNING and state != PJ_ICE_STRANS_STATE_FAILED;
587 }
588 return false;
589}
590
591bool
592IceTransport::Impl::_isFailed() const
593{
594 if (auto *icest = icest_)
595 return pj_ice_strans_get_state(icest) == PJ_ICE_STRANS_STATE_FAILED;
596 return false;
597}
598
599bool
600IceTransport::Impl::handleEvents(unsigned max_msec)
601{
602 // By tests, never seen more than two events per 500ms
603 static constexpr auto MAX_NET_EVENTS = 2;
604
605 pj_time_val max_timeout = {0, static_cast<long>(max_msec)};
606 pj_time_val timeout = {0, 0};
607 unsigned net_event_count = 0;
608
609 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timeout);
610 auto hasActiveTimer = timeout.sec != PJ_MAXINT32 || timeout.msec != PJ_MAXINT32;
611
612 // timeout limitation
613 if (hasActiveTimer)
614 pj_time_val_normalize(&timeout);
615
616 if (PJ_TIME_VAL_GT(timeout, max_timeout)) {
617 timeout = max_timeout;
618 }
619
620 do {
621 auto n_events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
622
623 // timeout
624 if (not n_events)
625 return hasActiveTimer;
626
627 // error
628 if (n_events < 0) {
629 const auto err = pj_get_os_error();
630 // Kept as debug as some errors are "normal" in regular context
631 if (logger_)
632 logger_->debug("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
633 std::this_thread::sleep_for(std::chrono::milliseconds(PJ_TIME_VAL_MSEC(timeout)));
634 return hasActiveTimer;
635 }
636
637 net_event_count += n_events;
638 timeout.sec = timeout.msec = 0;
639 } while (net_event_count < MAX_NET_EVENTS);
640 return hasActiveTimer;
641}
642
643int
644IceTransport::Impl::flushTimerHeapAndIoQueue()
645{
646 pj_time_val timerTimeout = {0, 0};
647 pj_time_val defaultWaitTime = {0, HANDLE_EVENT_DURATION};
648 bool hasActiveTimer = false;
649 std::chrono::milliseconds totalWaitTime {0};
650 auto const start = std::chrono::steady_clock::now();
651 // We try to process pending events as fast as possible to
652 // speed-up the release.
653 int maxEventToProcess = 10;
654
655 do {
656 if (checkEventQueue(maxEventToProcess) < 0)
657 return -1;
658
659 pj_timer_heap_poll(config_.stun_cfg.timer_heap, &timerTimeout);
660 hasActiveTimer = !(timerTimeout.sec == PJ_MAXINT32 && timerTimeout.msec == PJ_MAXINT32);
661
662 if (hasActiveTimer) {
663 pj_time_val_normalize(&timerTimeout);
664 auto waitTime = std::chrono::milliseconds(
665 std::min(PJ_TIME_VAL_MSEC(timerTimeout), PJ_TIME_VAL_MSEC(defaultWaitTime)));
666 std::this_thread::sleep_for(waitTime);
667 totalWaitTime += waitTime;
668 }
669 } while (hasActiveTimer && totalWaitTime < std::chrono::milliseconds(MAX_DESTRUCTION_TIMEOUT));
670
671 auto duration = std::chrono::steady_clock::now() - start;
672 if (logger_)
673 logger_->debug("[ice:{}] Timer heap flushed after {}", fmt::ptr(this), dht::print_duration(duration));
674
675 return static_cast<int>(pj_timer_heap_count(config_.stun_cfg.timer_heap));
676}
677
678int
679IceTransport::Impl::checkEventQueue(int maxEventToPoll)
680{
681 pj_time_val timeout = {0, 0};
682 int eventCount = 0;
683 int events = 0;
684
685 do {
686 events = pj_ioqueue_poll(config_.stun_cfg.ioqueue, &timeout);
687 if (events < 0) {
688 const auto err = pj_get_os_error();
689 if (logger_)
690 logger_->error("[ice:{}] ioqueue error {:d}: {:s}", fmt::ptr(this), err, sip_utils::sip_strerror(err));
691 return events;
692 }
693
694 eventCount += events;
695
696 } while (events > 0 && eventCount < maxEventToPoll);
697
698 return eventCount;
699}
700
701void
702IceTransport::Impl::onComplete(pj_ice_strans*, pj_ice_strans_op op, pj_status_t status)
703{
704 const char* opname = op == PJ_ICE_STRANS_OP_INIT ? "initialization"
705 : op == PJ_ICE_STRANS_OP_NEGOTIATION ? "negotiation"
706 : "unknown_op";
707
708 const bool done = status == PJ_SUCCESS;
709 if (done) {
710 if (logger_)
711 logger_->debug("[ice:{}] {:s} {:s} success",
712 fmt::ptr(this),
713 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
714 opname);
715 } else {
716 if (logger_)
717 logger_->error("[ice:{}] {:s} {:s} failed: {:s}",
718 fmt::ptr(this),
719 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
720 opname,
721 sip_utils::sip_strerror(status));
722 }
723
724 if (done and op == PJ_ICE_STRANS_OP_INIT) {
725 if (initiatorSession_)
726 setInitiatorSession();
727 else
728 setSlaveSession();
729 }
730
731 if (op == PJ_ICE_STRANS_OP_INIT and on_initdone_cb_)
732 on_initdone_cb_(done);
733 else if (op == PJ_ICE_STRANS_OP_NEGOTIATION) {
734 if (done) {
735 // Dump of connection pairs
736 if (logger_)
737 logger_->debug("[ice:{}] {:s} connection pairs ([comp id] local [type] <-> remote [type]):\n{:s}",
738 fmt::ptr(this),
739 (config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"),
740 link());
741 }
742 if (on_negodone_cb_)
743 on_negodone_cb_(done);
744 }
745
746 iceCV_.notify_all();
747}
748
749std::string
750IceTransport::Impl::link() const
751{
752 std::ostringstream out;
753 for (unsigned strm = 1; strm <= streamsCount_ * compCountPerStream_; strm++) {
754 auto absIdx = strm;
755 auto comp = (strm + 1) / compCountPerStream_;
756 auto laddr = getLocalAddress(absIdx);
757 auto raddr = getRemoteAddress(absIdx);
758
759 if (laddr and laddr.getPort() != 0 and raddr and raddr.getPort() != 0) {
760 out << " [" << comp << "] " << laddr.toString(true, true) << " ["
761 << getCandidateType(getSelectedCandidate(absIdx, false)) << "] "
762 << " <-> " << raddr.toString(true, true) << " ["
763 << getCandidateType(getSelectedCandidate(absIdx, true)) << "] " << '\n';
764 } else {
765 out << " [" << comp << "] disabled\n";
766 }
767 }
768 return out.str();
769}
770
771bool
772IceTransport::Impl::setInitiatorSession()
773{
774 if (logger_)
775 logger_->debug("[ice:{}] as master", fmt::ptr(this));
776 initiatorSession_ = true;
777 if (_isInitialized()) {
778 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLING);
779 if (status != PJ_SUCCESS) {
780 if (logger_)
781 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
782 return false;
783 }
784 return true;
785 }
786 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLING);
787}
788
789bool
790IceTransport::Impl::setSlaveSession()
791{
792 if (logger_)
793 logger_->debug("[ice:{}] as slave", fmt::ptr(this));
794 initiatorSession_ = false;
795 if (_isInitialized()) {
796 auto status = pj_ice_strans_change_role(icest_, PJ_ICE_SESS_ROLE_CONTROLLED);
797 if (status != PJ_SUCCESS) {
798 if (logger_)
799 logger_->error("[ice:{}] role change failed: {:s}", fmt::ptr(this), sip_utils::sip_strerror(status));
800 return false;
801 }
802 return true;
803 }
804 return createIceSession(PJ_ICE_SESS_ROLE_CONTROLLED);
805}
806
807const pj_ice_sess_cand*
808IceTransport::Impl::getSelectedCandidate(unsigned comp_id, bool remote) const
809{
810 ASSERT_COMP_ID(comp_id, compCount_);
811
812 // Return the selected candidate pair. Might not be the nominated pair if
813 // ICE has not concluded yet, but should be the nominated pair afterwards.
814 if (not _isRunning()) {
815 if (logger_)
816 logger_->error("[ice:{}] ICE transport is not running", fmt::ptr(this));
817 return nullptr;
818 }
819
820 const auto* sess = pj_ice_strans_get_valid_pair(icest_, comp_id);
821 if (sess == nullptr) {
822 if (logger_)
823 logger_->warn("[ice:{}] Component {} has no valid pair (disabled)", fmt::ptr(this), comp_id);
824 return nullptr;
825 }
826
827 if (remote)
828 return sess->rcand;
829 else
830 return sess->lcand;
831}
832
833IpAddr
834IceTransport::Impl::getLocalAddress(unsigned comp_id) const
835{
836 ASSERT_COMP_ID(comp_id, compCount_);
837
838 if (auto cand = getSelectedCandidate(comp_id, false))
839 return cand->addr;
840
841 return {};
842}
843
844IpAddr
845IceTransport::Impl::getRemoteAddress(unsigned comp_id) const
846{
847 ASSERT_COMP_ID(comp_id, compCount_);
848
849 if (auto cand = getSelectedCandidate(comp_id, true))
850 return cand->addr;
851
852 return {};
853}
854
855const char*
856IceTransport::Impl::getCandidateType(const pj_ice_sess_cand* cand)
857{
858 auto name = cand ? pj_ice_get_cand_type_name(cand->type) : nullptr;
859 return name ? name : "?";
860}
861
862void
863IceTransport::Impl::getUFragPwd()
864{
865 if (icest_) {
866 pj_str_t local_ufrag, local_pwd;
867
868 pj_ice_strans_get_ufrag_pwd(icest_, &local_ufrag, &local_pwd, nullptr, nullptr);
869 local_ufrag_.assign(local_ufrag.ptr, local_ufrag.slen);
870 local_pwd_.assign(local_pwd.ptr, local_pwd.slen);
871 }
872}
873
874bool
875IceTransport::Impl::createIceSession(pj_ice_sess_role role)
876{
877 if (not icest_) {
878 return false;
879 }
880
881 if (pj_ice_strans_init_ice(icest_, role, nullptr, nullptr) != PJ_SUCCESS) {
882 if (logger_)
883 logger_->error("[ice:{}] pj_ice_strans_init_ice() failed", fmt::ptr(this));
884 return false;
885 }
886
887 // Fetch some information on local configuration
888 getUFragPwd();
889
890 if (logger_)
891 logger_->debug("[ice:{}] (local) ufrag=%s, pwd=%s", fmt::ptr(this), local_ufrag_.c_str(), local_pwd_.c_str());
892
893 return true;
894}
895
896bool
897IceTransport::Impl::addStunConfig(int af)
898{
899 if (config_.stun_tp_cnt >= PJ_ICE_MAX_STUN) {
900 if (logger_)
901 logger_->error("Max number of STUN configurations reached (%i)", PJ_ICE_MAX_STUN);
902 return false;
903 }
904
905 if (af != pj_AF_INET() and af != pj_AF_INET6()) {
906 if (logger_)
907 logger_->error("Invalid address familly (%i)", af);
908 return false;
909 }
910
911 auto& stun = config_.stun_tp[config_.stun_tp_cnt++];
912
913 pj_ice_strans_stun_cfg_default(&stun);
914 stun.cfg.max_pkt_size = STUN_MAX_PACKET_SIZE;
915 stun.af = af;
916 stun.conn_type = config_.stun.conn_type;
917
918 if (logger_)
919 logger_->debug("[ice:{}] added host stun config for {:s} transport",
920 fmt::ptr(this),
921 config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
922
923 return true;
924}
925
926void
927IceTransport::Impl::requestUpnpMappings()
928{
929 // Must be called once !
930
931 std::lock_guard<std::mutex> lock(upnpMutex_);
932
933 if (not upnp_)
934 return;
935
936 auto transport = isTcpEnabled() ? PJ_CAND_TCP_PASSIVE : PJ_CAND_UDP;
937 auto portType = transport == PJ_CAND_UDP ? PortType::UDP : PortType::TCP;
938
939 // Request upnp mapping for each component.
940 for (unsigned id = 1; id <= compCount_; id++) {
941 // Set port number to 0 to get any available port.
942 Mapping requestedMap(portType);
943
944 // Request the mapping
945 Mapping::sharedPtr_t mapPtr = upnp_->reserveMapping(requestedMap);
946
947 // To use a mapping, it must be valid, open and has valid host address.
948 if (mapPtr and mapPtr->getMapKey() and (mapPtr->getState() == MappingState::OPEN)
949 and mapPtr->hasValidHostAddress()) {
950 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
951 auto ret = upnpMappings_.emplace(mapPtr->getMapKey(), *mapPtr);
952 if (ret.second) {
953 if (logger_)
954 logger_->debug("[ice:{}] UPNP mapping {:s} successfully allocated",
955 fmt::ptr(this),
956 mapPtr->toString(true));
957 } else {
958 if (logger_)
959 logger_->warn("[ice:{}] UPNP mapping {:s} already in the list!",
960 fmt::ptr(this),
961 mapPtr->toString());
962 }
963 } else {
964 if (logger_)
965 logger_->warn("[ice:{}] UPNP mapping request failed!", fmt::ptr(this));
966 upnp_->releaseMapping(requestedMap);
967 }
968 }
969}
970
971bool
972IceTransport::Impl::hasUpnp() const
973{
974 return upnp_ and upnpMappings_.size() == compCount_;
975}
976
977void
978IceTransport::Impl::addServerReflexiveCandidates(
979 const std::vector<std::pair<IpAddr, IpAddr>>& addrList)
980{
981 if (addrList.size() != compCount_) {
982 if (logger_)
983 logger_->warn("[ice:{}] Provided addr list size {} does not match component count {}",
984 fmt::ptr(this),
985 addrList.size(),
986 compCount_);
987 return;
988 }
989 if (compCount_ > PJ_ICE_MAX_COMP) {
990 if (logger_)
991 logger_->error("[ice:{}] Too many components", fmt::ptr(this));
992 return;
993 }
994
995 // Add config for server reflexive candidates (UPNP or from DHT).
996 if (not addStunConfig(pj_AF_INET()))
997 return;
998
999 assert(config_.stun_tp_cnt > 0 && config_.stun_tp_cnt < PJ_ICE_MAX_STUN);
1000 auto& stun = config_.stun_tp[config_.stun_tp_cnt - 1];
1001
1002 for (unsigned id = 1; id <= compCount_; id++) {
1003 auto idx = id - 1;
1004 auto& localAddr = addrList[idx].first;
1005 auto& publicAddr = addrList[idx].second;
1006
1007 if (logger_)
1008 logger_->debug("[ice:{}] Add srflx reflexive candidates [{:s} : {:s}] for comp {:d}",
1009 fmt::ptr(this),
1010 localAddr.toString(true),
1011 publicAddr.toString(true),
1012 id);
1013
1014 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].local_addr, localAddr.pjPtr());
1015 pj_sockaddr_cp(&stun.cfg.user_mapping[idx].mapped_addr, publicAddr.pjPtr());
1016
1017 if (isTcpEnabled()) {
1018 if (publicAddr.getPort() == 9) {
1019 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_ACTIVE;
1020 } else {
1021 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_TCP_PASSIVE;
1022 }
1023 } else {
1024 stun.cfg.user_mapping[idx].tp_type = PJ_CAND_UDP;
1025 }
1026 }
1027
1028 stun.cfg.user_mapping_cnt = compCount_;
1029}
1030
1031std::vector<std::pair<IpAddr, IpAddr>>
1032IceTransport::Impl::setupGenericReflexiveCandidates()
1033{
1034 if (not accountLocalAddr_) {
1035 if (logger_)
1036 logger_->warn("[ice:{}] Missing local address, generic srflx candidates wont be generated!",
1037 fmt::ptr(this));
1038 return {};
1039 }
1040
1041 if (not accountPublicAddr_) {
1042 if (logger_)
1043 logger_->warn("[ice:{}] Missing public address, generic srflx candidates wont be generated!",
1044 fmt::ptr(this));
1045 return {};
1046 }
1047
1048 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1049 auto isTcp = isTcpEnabled();
1050
1051 addrList.reserve(compCount_);
1052 for (unsigned id = 1; id <= compCount_; id++) {
1053 // For TCP, the type is set to active, because most likely the incoming
1054 // connection will be blocked by the NAT.
1055 // For UDP use random port number.
1056 uint16_t port = isTcp ? 9
1057 : upnp::Controller::generateRandomPort(isTcp ? PortType::TCP
1058 : PortType::UDP);
1059
1060 accountLocalAddr_.setPort(port);
1061 accountPublicAddr_.setPort(port);
1062 addrList.emplace_back(accountLocalAddr_, accountPublicAddr_);
1063 }
1064
1065 return addrList;
1066}
1067
1068std::vector<std::pair<IpAddr, IpAddr>>
1069IceTransport::Impl::setupUpnpReflexiveCandidates()
1070{
1071 // Add UPNP server reflexive candidates if available.
1072 if (not hasUpnp())
1073 return {};
1074
1075 std::lock_guard<std::mutex> lock(upnpMappingsMutex_);
1076
1077 if (upnpMappings_.size() < (size_t)compCount_) {
1078 if (logger_)
1079 logger_->warn("[ice:{}] Not enough mappings {:d}. Expected {:d}",
1080 fmt::ptr(this),
1081 upnpMappings_.size(),
1082 compCount_);
1083 return {};
1084 }
1085
1086 std::vector<std::pair<IpAddr, IpAddr>> addrList;
1087
1088 addrList.reserve(upnpMappings_.size());
1089 for (auto const& [_, map] : upnpMappings_) {
1090 assert(map.getMapKey());
1091 IpAddr localAddr {map.getInternalAddress()};
1092 localAddr.setPort(map.getInternalPort());
1093 IpAddr publicAddr {map.getExternalAddress()};
1094 publicAddr.setPort(map.getExternalPort());
1095 addrList.emplace_back(localAddr, publicAddr);
1096 }
1097
1098 return addrList;
1099}
1100
1101void
1102IceTransport::Impl::setDefaultRemoteAddress(unsigned compId, const IpAddr& addr)
1103{
1104 ASSERT_COMP_ID(compId, compCount_);
1105
1106 iceDefaultRemoteAddr_[compId - 1] = addr;
1107 // The port does not matter. Set it 0 to avoid confusion.
1108 iceDefaultRemoteAddr_[compId - 1].setPort(0);
1109}
1110
1111IpAddr
1112IceTransport::Impl::getDefaultRemoteAddress(unsigned compId) const
1113{
1114 ASSERT_COMP_ID(compId, compCount_);
1115 return iceDefaultRemoteAddr_[compId - 1];
1116}
1117
1118void
1119IceTransport::Impl::onReceiveData(unsigned comp_id, void* pkt, pj_size_t size)
1120{
1121 ASSERT_COMP_ID(comp_id, compCount_);
1122
1123 jami_tracepoint_if_enabled(ice_transport_recv,
1124 reinterpret_cast<uint64_t>(this),
1125 comp_id,
1126 size,
1127 getRemoteAddress(comp_id).toString().c_str());
1128 if (size == 0)
1129 return;
1130
1131 {
1132 auto& io = compIO_[comp_id - 1];
1133 std::lock_guard<std::mutex> lk(io.mutex);
1134
1135 if (io.recvCb) {
1136 io.recvCb((uint8_t*) pkt, size);
1137 return;
1138 }
1139 }
1140
1141 std::error_code ec;
1142 auto err = peerChannels_.at(comp_id - 1).write((const char*) pkt, size, ec);
1143 if (err < 0) {
1144 if (logger_)
1145 logger_->error("[ice:{}] rx: channel is closed", fmt::ptr(this));
1146 }
1147}
1148
1149bool
1150IceTransport::Impl::_waitForInitialization(std::chrono::milliseconds timeout)
1151{
1152 IceLock lk(icest_);
1153
1154 if (not iceCV_.wait_for(lk, timeout, [this] {
1155 return threadTerminateFlags_ or _isInitialized() or _isFailed();
1156 })) {
1157 if (logger_)
1158 logger_->warn("[ice:{}] waitForInitialization: timeout", fmt::ptr(this));
1159 return false;
1160 }
1161
1162 return _isInitialized();
1163}
1164
1165//==============================================================================
1166
1167IceTransport::IceTransport(std::string_view name)
1168 : pimpl_ {std::make_unique<Impl>(name)}
1169{}
1170
1171IceTransport::~IceTransport()
1172{
1173 cancelOperations();
1174}
1175
1176const std::shared_ptr<dht::log::Logger>&
1177IceTransport::logger() const
1178{
1179 return pimpl_->logger_;
1180}
1181
1182void
1183IceTransport::initIceInstance(const IceTransportOptions& options)
1184{
1185 pimpl_->initIceInstance(options);
1186 jami_tracepoint(ice_transport_context, reinterpret_cast<uint64_t>(this));
1187}
1188
1189bool
1190IceTransport::isInitialized() const
1191{
1192 IceLock lk(pimpl_->icest_);
1193 return pimpl_->_isInitialized();
1194}
1195
1196bool
1197IceTransport::isStarted() const
1198{
1199 IceLock lk(pimpl_->icest_);
1200 return pimpl_->_isStarted();
1201}
1202
1203bool
1204IceTransport::isRunning() const
1205{
1206 if (!pimpl_->icest_)
1207 return false;
1208 IceLock lk(pimpl_->icest_);
1209 return pimpl_->_isRunning();
1210}
1211
1212bool
1213IceTransport::isFailed() const
1214{
1215 return pimpl_->_isFailed();
1216}
1217
1218unsigned
1219IceTransport::getComponentCount() const
1220{
1221 return pimpl_->compCount_;
1222}
1223
1224bool
1225IceTransport::setSlaveSession()
1226{
1227 return pimpl_->setSlaveSession();
1228}
1229bool
1230IceTransport::setInitiatorSession()
1231{
1232 return pimpl_->setInitiatorSession();
1233}
1234
1235bool
1236IceTransport::isInitiator() const
1237{
1238 if (isInitialized()) {
1239 return pj_ice_strans_get_role(pimpl_->icest_) == PJ_ICE_SESS_ROLE_CONTROLLING;
1240 }
1241 return pimpl_->initiatorSession_;
1242}
1243
1244bool
1245IceTransport::startIce(const Attribute& rem_attrs, std::vector<IceCandidate>&& rem_candidates)
1246{
1247 if (not isInitialized()) {
1248 if (pimpl_->logger_)
1249 pimpl_->logger_->error("[ice:{}] not initialized transport", fmt::ptr(pimpl_.get()));
1250 pimpl_->is_stopped_ = true;
1251 return false;
1252 }
1253
1254 // pj_ice_strans_start_ice crashes if remote candidates array is empty
1255 if (rem_candidates.empty()) {
1256 if (pimpl_->logger_)
1257 pimpl_->logger_->error("[ice:{}] start failed: no remote candidates", fmt::ptr(pimpl_.get()));
1258 pimpl_->is_stopped_ = true;
1259 return false;
1260 }
1261
1262 auto comp_cnt = std::max(1u, getComponentCount());
1263 if (rem_candidates.size() / comp_cnt > PJ_ICE_ST_MAX_CAND - 1) {
1264 std::vector<IceCandidate> rcands;
1265 rcands.reserve(PJ_ICE_ST_MAX_CAND - 1);
1266 if (pimpl_->logger_)
1267 pimpl_->logger_->warn("[ice:{}] too much candidates detected, trim list.", fmt::ptr(pimpl_.get()));
1268 // Just trim some candidates. To avoid to only take host candidates, iterate
1269 // through the whole list and select some host, some turn and peer reflexives
1270 // It should give at least enough infos to negotiate.
1271 auto maxHosts = 8;
1272 auto maxRelays = PJ_ICE_MAX_TURN;
1273 for (auto& c : rem_candidates) {
1274 if (c.type == PJ_ICE_CAND_TYPE_HOST) {
1275 if (maxHosts == 0)
1276 continue;
1277 maxHosts -= 1;
1278 } else if (c.type == PJ_ICE_CAND_TYPE_RELAYED) {
1279 if (maxRelays == 0)
1280 continue;
1281 maxRelays -= 1;
1282 }
1283 if (rcands.size() == PJ_ICE_ST_MAX_CAND - 1)
1284 break;
1285 rcands.emplace_back(std::move(c));
1286 }
1287 rem_candidates = std::move(rcands);
1288 }
1289
1290 pj_str_t ufrag, pwd;
1291 if (pimpl_->logger_)
1292 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:d} remote candidates)",
1293 fmt::ptr(pimpl_),
1294 rem_candidates.size());
1295
1296 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1297 pj_strset(&ufrag,
1298 (char*) rem_attrs.ufrag.c_str(),
1299 rem_attrs.ufrag.size()),
1300 pj_strset(&pwd,
1301 (char*) rem_attrs.pwd.c_str(),
1302 rem_attrs.pwd.size()),
1303 rem_candidates.size(),
1304 rem_candidates.data());
1305 if (status != PJ_SUCCESS) {
1306 if (pimpl_->logger_)
1307 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_.get()), sip_utils::sip_strerror(status));
1308 pimpl_->is_stopped_ = true;
1309 return false;
1310 }
1311
1312 return true;
1313}
1314
1315bool
1316IceTransport::startIce(const SDP& sdp)
1317{
1318 if (pimpl_->streamsCount_ != 1) {
1319 if (pimpl_->logger_)
1320 pimpl_->logger_->error(FMT_STRING("Expected exactly one stream per SDP (found {:u} streams)"), pimpl_->streamsCount_);
1321 return false;
1322 }
1323
1324 if (not isInitialized()) {
1325 if (pimpl_->logger_)
1326 pimpl_->logger_->error(FMT_STRING("[ice:{}] not initialized transport"), fmt::ptr(pimpl_));
1327 pimpl_->is_stopped_ = true;
1328 return false;
1329 }
1330
1331 for (unsigned id = 1; id <= getComponentCount(); id++) {
1332 auto candVec = getLocalCandidates(id);
1333 for (auto const& cand : candVec) {
1334 if (pimpl_->logger_)
1335 pimpl_->logger_->debug("[ice:{}] Using local candidate {:s} for comp {:d}",
1336 fmt::ptr(pimpl_), cand, id);
1337 }
1338 }
1339
1340 if (pimpl_->logger_)
1341 pimpl_->logger_->debug("[ice:{}] negotiation starting ({:u} remote candidates)",
1342 fmt::ptr(pimpl_), sdp.candidates.size());
1343 pj_str_t ufrag, pwd;
1344
1345 std::vector<IceCandidate> rem_candidates;
1346 rem_candidates.reserve(sdp.candidates.size());
1347 IceCandidate cand;
1348 for (const auto& line : sdp.candidates) {
1349 if (parseIceAttributeLine(0, line, cand))
1350 rem_candidates.emplace_back(cand);
1351 }
1352
1353 auto status = pj_ice_strans_start_ice(pimpl_->icest_,
1354 pj_strset(&ufrag,
1355 (char*) sdp.ufrag.c_str(),
1356 sdp.ufrag.size()),
1357 pj_strset(&pwd, (char*) sdp.pwd.c_str(), sdp.pwd.size()),
1358 rem_candidates.size(),
1359 rem_candidates.data());
1360 if (status != PJ_SUCCESS) {
1361 if (pimpl_->logger_)
1362 pimpl_->logger_->error("[ice:{}] start failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1363 pimpl_->is_stopped_ = true;
1364 return false;
1365 }
1366
1367 return true;
1368}
1369
1370void
1371IceTransport::cancelOperations()
1372{
1373 pimpl_->cancelOperations();
1374}
1375
1376IpAddr
1377IceTransport::getLocalAddress(unsigned comp_id) const
1378{
1379 return pimpl_->getLocalAddress(comp_id);
1380}
1381
1382IpAddr
1383IceTransport::getRemoteAddress(unsigned comp_id) const
1384{
1385 // Return the default remote address if set.
1386 // Note that the default remote addresses are the addresses
1387 // set in the 'c=' and 'a=rtcp' lines of the received SDP.
1388 // See pj_ice_strans_sendto2() for more details.
1389 if (auto defaultAddr = pimpl_->getDefaultRemoteAddress(comp_id)) {
1390 return defaultAddr;
1391 }
1392
1393 return pimpl_->getRemoteAddress(comp_id);
1394}
1395
1396const IceTransport::Attribute
1397IceTransport::getLocalAttributes() const
1398{
1399 return {pimpl_->local_ufrag_, pimpl_->local_pwd_};
1400}
1401
1402std::vector<std::string>
1403IceTransport::getLocalCandidates(unsigned comp_id) const
1404{
1405 ASSERT_COMP_ID(comp_id, getComponentCount());
1406 std::vector<std::string> res;
1407 pj_ice_sess_cand cand[MAX_CANDIDATES];
1408 unsigned cand_cnt = PJ_ARRAY_SIZE(cand);
1409
1410 if (!isInitialized()) {
1411 return res;
1412 }
1413
1414 if (pj_ice_strans_enum_cands(pimpl_->icest_, comp_id, &cand_cnt, cand) != PJ_SUCCESS) {
1415 if (pimpl_->logger_)
1416 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1417 return res;
1418 }
1419
1420 res.reserve(cand_cnt);
1421 for (unsigned i = 0; i < cand_cnt; ++i) {
1422 /** Section 4.5, RFC 6544 (https://tools.ietf.org/html/rfc6544)
1423 * candidate-attribute = "candidate" ":" foundation SP component-id
1424 * SP "TCP" SP priority SP connection-address SP port SP cand-type [SP
1425 * rel-addr] [SP rel-port] SP tcp-type-ext
1426 * *(SP extension-att-name SP
1427 * extension-att-value)
1428 *
1429 * tcp-type-ext = "tcptype" SP tcp-type
1430 * tcp-type = "active" / "passive" / "so"
1431 */
1432 char ipaddr[PJ_INET6_ADDRSTRLEN];
1433 std::string tcp_type;
1434 if (cand[i].transport != PJ_CAND_UDP) {
1435 tcp_type += " tcptype";
1436 switch (cand[i].transport) {
1437 case PJ_CAND_TCP_ACTIVE:
1438 tcp_type += " active";
1439 break;
1440 case PJ_CAND_TCP_PASSIVE:
1441 tcp_type += " passive";
1442 break;
1443 case PJ_CAND_TCP_SO:
1444 default:
1445 tcp_type += " so";
1446 break;
1447 }
1448 }
1449 res.emplace_back(
1450 fmt::format("{} {} {} {} {} {} typ {}{}",
1451 sip_utils::as_view(cand[i].foundation),
1452 cand[i].comp_id,
1453 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1454 cand[i].prio,
1455 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1456 pj_sockaddr_get_port(&cand[i].addr),
1457 pj_ice_get_cand_type_name(cand[i].type),
1458 tcp_type));
1459 }
1460
1461 return res;
1462}
1463std::vector<std::string>
1464IceTransport::getLocalCandidates(unsigned streamIdx, unsigned compId) const
1465{
1466 ASSERT_COMP_ID(compId, getComponentCount());
1467
1468 std::vector<std::string> res;
1469 pj_ice_sess_cand cand[MAX_CANDIDATES];
1470 unsigned cand_cnt = MAX_CANDIDATES;
1471
1472 if (not isInitialized()) {
1473 return res;
1474 }
1475
1476 // In the implementation, the component IDs are enumerated globally
1477 // (per SDP: 1, 2, 3, 4, ...). This is simpler because we create
1478 // only one pj_ice_strans instance. However, the component IDs are
1479 // enumerated per stream in the generated SDP (1, 2, 1, 2, ...) in
1480 // order to be compliant with the spec.
1481
1482 auto globalCompId = streamIdx * 2 + compId;
1483 if (pj_ice_strans_enum_cands(pimpl_->icest_, globalCompId, &cand_cnt, cand) != PJ_SUCCESS) {
1484 if (pimpl_->logger_)
1485 pimpl_->logger_->error("[ice:{}] pj_ice_strans_enum_cands() failed", fmt::ptr(pimpl_));
1486 return res;
1487 }
1488
1489 res.reserve(cand_cnt);
1490 // Build ICE attributes according to RFC 6544, section 4.5.
1491 for (unsigned i = 0; i < cand_cnt; ++i) {
1492 char ipaddr[PJ_INET6_ADDRSTRLEN];
1493 std::string tcp_type;
1494 if (cand[i].transport != PJ_CAND_UDP) {
1495 tcp_type += " tcptype";
1496 switch (cand[i].transport) {
1497 case PJ_CAND_TCP_ACTIVE:
1498 tcp_type += " active";
1499 break;
1500 case PJ_CAND_TCP_PASSIVE:
1501 tcp_type += " passive";
1502 break;
1503 case PJ_CAND_TCP_SO:
1504 default:
1505 tcp_type += " so";
1506 break;
1507 }
1508 }
1509 res.emplace_back(
1510 fmt::format("{} {} {} {} {} {} typ {}{}",
1511 sip_utils::as_view(cand[i].foundation),
1512 compId,
1513 (cand[i].transport == PJ_CAND_UDP ? "UDP" : "TCP"),
1514 cand[i].prio,
1515 pj_sockaddr_print(&cand[i].addr, ipaddr, sizeof(ipaddr), 0),
1516 pj_sockaddr_get_port(&cand[i].addr),
1517 pj_ice_get_cand_type_name(cand[i].type),
1518 tcp_type));
1519 }
1520
1521 return res;
1522}
1523
1524bool
1525IceTransport::parseIceAttributeLine(unsigned streamIdx,
1526 const std::string& line,
1527 IceCandidate& cand) const
1528{
1529 // Silently ignore empty lines
1530 if (line.empty())
1531 return false;
1532
1533 if (streamIdx >= pimpl_->streamsCount_) {
1534 throw std::runtime_error(fmt::format("Stream index {:d} is invalid!", streamIdx));
1535 }
1536
1537 int af, cnt;
1538 char foundation[32], transport[12], ipaddr[80], type[32], tcp_type[32];
1539 pj_str_t tmpaddr;
1540 unsigned comp_id, prio, port;
1541 pj_status_t status;
1542 pj_bool_t is_tcp = PJ_FALSE;
1543
1544 // Parse ICE attribute line according to RFC-6544 section 4.5.
1545 // TODO/WARNING: There is no fail-safe in case of malformed attributes.
1546 cnt = sscanf(line.c_str(),
1547 "%31s %u %11s %u %79s %u typ %31s tcptype %31s\n",
1548 foundation,
1549 &comp_id,
1550 transport,
1551 &prio,
1552 ipaddr,
1553 &port,
1554 type,
1555 tcp_type);
1556 if (cnt != 7 && cnt != 8) {
1557 if (pimpl_->logger_)
1558 pimpl_->logger_->error("[ice:{}] Invalid ICE candidate line: {:s}", fmt::ptr(pimpl_), line);
1559 return false;
1560 }
1561
1562 if (strcmp(transport, "TCP") == 0) {
1563 is_tcp = PJ_TRUE;
1564 }
1565
1566 pj_bzero(&cand, sizeof(IceCandidate));
1567
1568 if (strcmp(type, "host") == 0)
1569 cand.type = PJ_ICE_CAND_TYPE_HOST;
1570 else if (strcmp(type, "srflx") == 0)
1571 cand.type = PJ_ICE_CAND_TYPE_SRFLX;
1572 else if (strcmp(type, "prflx") == 0)
1573 cand.type = PJ_ICE_CAND_TYPE_PRFLX;
1574 else if (strcmp(type, "relay") == 0)
1575 cand.type = PJ_ICE_CAND_TYPE_RELAYED;
1576 else {
1577 if (pimpl_->logger_)
1578 pimpl_->logger_->warn("[ice:{}] invalid remote candidate type '{:s}'", fmt::ptr(pimpl_), type);
1579 return false;
1580 }
1581
1582 if (is_tcp) {
1583 if (strcmp(tcp_type, "active") == 0)
1584 cand.transport = PJ_CAND_TCP_ACTIVE;
1585 else if (strcmp(tcp_type, "passive") == 0)
1586 cand.transport = PJ_CAND_TCP_PASSIVE;
1587 else if (strcmp(tcp_type, "so") == 0)
1588 cand.transport = PJ_CAND_TCP_SO;
1589 else {
1590 if (pimpl_->logger_)
1591 pimpl_->logger_->warn("[ice:{}] invalid transport type type '{:s}'", fmt::ptr(pimpl_), tcp_type);
1592 return false;
1593 }
1594 } else {
1595 cand.transport = PJ_CAND_UDP;
1596 }
1597
1598 // If the component Id is enumerated relative to media, convert
1599 // it to absolute enumeration.
1600 if (comp_id <= pimpl_->compCountPerStream_) {
1601 comp_id += pimpl_->compCountPerStream_ * streamIdx;
1602 }
1603 cand.comp_id = (pj_uint8_t) comp_id;
1604
1605 cand.prio = prio;
1606
1607 if (strchr(ipaddr, ':'))
1608 af = pj_AF_INET6();
1609 else {
1610 af = pj_AF_INET();
1611 pimpl_->onlyIPv4Private_ &= IpAddr(ipaddr).isPrivate();
1612 }
1613
1614 tmpaddr = pj_str(ipaddr);
1615 pj_sockaddr_init(af, &cand.addr, NULL, 0);
1616 status = pj_sockaddr_set_str_addr(af, &cand.addr, &tmpaddr);
1617 if (status != PJ_SUCCESS) {
1618 if (pimpl_->logger_)
1619 pimpl_->logger_->warn("[ice:{}] invalid IP address '{:s}'", fmt::ptr(pimpl_), ipaddr);
1620 return false;
1621 }
1622
1623 pj_sockaddr_set_port(&cand.addr, (pj_uint16_t) port);
1624 pj_strdup2(pimpl_->pool_.get(), &cand.foundation, foundation);
1625
1626 return true;
1627}
1628
1629ssize_t
1630IceTransport::recv(unsigned compId, unsigned char* buf, size_t len, std::error_code& ec)
1631{
1632 ASSERT_COMP_ID(compId, getComponentCount());
1633 auto& io = pimpl_->compIO_[compId - 1];
1634 std::lock_guard<std::mutex> lk(io.mutex);
1635
1636 if (io.queue.empty()) {
1637 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
1638 return -1;
1639 }
1640
1641 auto& packet = io.queue.front();
1642 const auto count = std::min(len, packet.data.size());
1643 std::copy_n(packet.data.begin(), count, buf);
1644 if (count == packet.data.size()) {
1645 io.queue.pop_front();
1646 } else {
1647 packet.data.erase(packet.data.begin(), packet.data.begin() + count);
1648 }
1649
1650 ec.clear();
1651 return count;
1652}
1653
1654ssize_t
1655IceTransport::recvfrom(unsigned compId, char* buf, size_t len, std::error_code& ec)
1656{
1657 ASSERT_COMP_ID(compId, getComponentCount());
1658 return pimpl_->peerChannels_.at(compId - 1).read(buf, len, ec);
1659}
1660
1661void
1662IceTransport::setOnRecv(unsigned compId, IceRecvCb cb)
1663{
1664 ASSERT_COMP_ID(compId, getComponentCount());
1665
1666 auto& io = pimpl_->compIO_[compId - 1];
1667 std::lock_guard<std::mutex> lk(io.mutex);
1668 io.recvCb = std::move(cb);
1669
1670 if (io.recvCb) {
1671 // Flush existing queue using the callback
1672 for (const auto& packet : io.queue)
1673 io.recvCb((uint8_t*) packet.data.data(), packet.data.size());
1674 io.queue.clear();
1675 }
1676}
1677
1678void
1679IceTransport::setOnShutdown(onShutdownCb&& cb)
1680{
1681 pimpl_->scb = cb;
1682}
1683
1684ssize_t
1685IceTransport::send(unsigned compId, const unsigned char* buf, size_t len)
1686{
1687 ASSERT_COMP_ID(compId, getComponentCount());
1688
1689 auto remote = getRemoteAddress(compId);
1690
1691 if (!remote) {
1692 if (pimpl_->logger_)
1693 pimpl_->logger_->error("[ice:{}] can't find remote address for component {:d}", fmt::ptr(pimpl_), compId);
1694 errno = EINVAL;
1695 return -1;
1696 }
1697
1698 std::unique_lock dlk(pimpl_->sendDataMutex_, std::defer_lock);
1699 if (isTCPEnabled())
1700 dlk.lock();
1701
1702 jami_tracepoint(ice_transport_send,
1703 reinterpret_cast<uint64_t>(this),
1704 compId,
1705 len,
1706 remote.toString().c_str());
1707
1708 auto status = pj_ice_strans_sendto2(pimpl_->icest_,
1709 compId,
1710 buf,
1711 len,
1712 remote.pjPtr(),
1713 remote.getLength());
1714
1715 jami_tracepoint(ice_transport_send_status, status);
1716
1717 if (status == PJ_EPENDING && isTCPEnabled()) {
1718 // NOTE; because we are in TCP, the sent size will count the header (2
1719 // bytes length).
1720 pimpl_->waitDataCv_.wait(dlk, [&] {
1721 return pimpl_->lastSentLen_ >= static_cast<pj_size_t>(len) or pimpl_->destroying_;
1722 });
1723 pimpl_->lastSentLen_ = 0;
1724 } else if (status != PJ_SUCCESS && status != PJ_EPENDING) {
1725 if (status == PJ_EBUSY) {
1726 errno = EAGAIN;
1727 } else {
1728 if (pimpl_->logger_)
1729 pimpl_->logger_->error("[ice:{}] ice send failed: {:s}", fmt::ptr(pimpl_), sip_utils::sip_strerror(status));
1730 errno = EIO;
1731 }
1732 return -1;
1733 }
1734
1735 return len;
1736}
1737
1738bool
1739IceTransport::waitForInitialization(std::chrono::milliseconds timeout)
1740{
1741 return pimpl_->_waitForInitialization(timeout);
1742}
1743
1744ssize_t
1745IceTransport::waitForData(unsigned compId, std::chrono::milliseconds timeout, std::error_code& ec)
1746{
1747 ASSERT_COMP_ID(compId, getComponentCount());
1748 return pimpl_->peerChannels_.at(compId - 1).wait(timeout, ec);
1749}
1750
1751bool
1752IceTransport::isTCPEnabled()
1753{
1754 return pimpl_->isTcpEnabled();
1755}
1756
1757ICESDP
1758IceTransport::parseIceCandidates(std::string_view sdp_msg)
1759{
1760 if (pimpl_->streamsCount_ != 1) {
1761 if (pimpl_->logger_)
1762 pimpl_->logger_->error("Expected exactly one stream per SDP (found %u streams)", pimpl_->streamsCount_);
1763 return {};
1764 }
1765
1766 ICESDP res;
1767 int nr = 0;
1768 for (std::string_view line; jami::getline(sdp_msg, line); nr++) {
1769 if (nr == 0) {
1770 res.rem_ufrag = line;
1771 } else if (nr == 1) {
1772 res.rem_pwd = line;
1773 } else {
1774 IceCandidate cand;
1775 if (parseIceAttributeLine(0, std::string(line), cand)) {
1776 if (pimpl_->logger_)
1777 pimpl_->logger_->debug("[ice:{}] Add remote candidate: {}",
1778 fmt::ptr(pimpl_),
1779 line);
1780 res.rem_candidates.emplace_back(cand);
1781 }
1782 }
1783 }
1784 return res;
1785}
1786
1787void
1788IceTransport::setDefaultRemoteAddress(unsigned comp_id, const IpAddr& addr)
1789{
1790 pimpl_->setDefaultRemoteAddress(comp_id, addr);
1791}
1792
1793std::string
1794IceTransport::link() const
1795{
1796 return pimpl_->link();
1797}
1798
1799//==============================================================================
1800
1801IceTransportFactory::IceTransportFactory()
1802 : cp_(new pj_caching_pool(),
1803 [](pj_caching_pool* p) {
1804 pj_caching_pool_destroy(p);
1805 delete p;
1806 })
1807 , ice_cfg_()
1808{
1809 pj_caching_pool_init(cp_.get(), NULL, 0);
1810
1811 pj_ice_strans_cfg_default(&ice_cfg_);
1812 ice_cfg_.stun_cfg.pf = &cp_->factory;
1813
1814 // v2.4.5 of PJNATH has a default of 100ms but RFC 5389 since version 14 requires
1815 // a minimum of 500ms on fixed-line links. Our usual case is wireless links.
1816 // This solves too long ICE exchange by DHT.
1817 // Using 500ms with default PJ_STUN_MAX_TRANSMIT_COUNT (7) gives around 33s before timeout.
1818 ice_cfg_.stun_cfg.rto_msec = 500;
1819
1820 // See https://tools.ietf.org/html/rfc5245#section-8.1.1.2
1821 // If enabled, it may help speed-up the connectivity, but may cause
1822 // the nomination of sub-optimal pairs.
1823 ice_cfg_.opt.aggressive = PJ_FALSE;
1824}
1825
1826IceTransportFactory::~IceTransportFactory() {}
1827
1828std::shared_ptr<IceTransport>
1829IceTransportFactory::createTransport(std::string_view name)
1830{
1831 try {
1832 return std::make_shared<IceTransport>(name);
1833 } catch (const std::exception& e) {
1834 //JAMI_ERR("%s", e.what());
1835 return nullptr;
1836 }
1837}
1838
1839std::unique_ptr<IceTransport>
1840IceTransportFactory::createUTransport(std::string_view name)
1841{
1842 try {
1843 return std::make_unique<IceTransport>(name);
1844 } catch (const std::exception& e) {
1845 //JAMI_ERR("%s", e.what());
1846 return nullptr;
1847 }
1848}
1849
1850//==============================================================================
1851
1852void
1853IceSocket::close()
1854{
1855 if (ice_transport_)
1856 ice_transport_->setOnRecv(compId_, {});
1857 ice_transport_.reset();
1858}
1859
1860ssize_t
1861IceSocket::send(const unsigned char* buf, size_t len)
1862{
1863 if (not ice_transport_)
1864 return -1;
1865 return ice_transport_->send(compId_, buf, len);
1866}
1867
1868ssize_t
1869IceSocket::waitForData(std::chrono::milliseconds timeout)
1870{
1871 if (not ice_transport_)
1872 return -1;
1873
1874 std::error_code ec;
1875 return ice_transport_->waitForData(compId_, timeout, ec);
1876}
1877
1878void
1879IceSocket::setOnRecv(IceRecvCb cb)
1880{
1881 if (ice_transport_)
1882 ice_transport_->setOnRecv(compId_, cb);
1883}
1884
1885uint16_t
1886IceSocket::getTransportOverhead()
1887{
1888 if (not ice_transport_)
1889 return 0;
1890
1891 return (ice_transport_->getRemoteAddress(compId_).getFamily() == AF_INET) ? IPV4_HEADER_SIZE
1892 : IPV6_HEADER_SIZE;
1893}
1894
1895void
1896IceSocket::setDefaultRemoteAddress(const IpAddr& addr)
1897{
1898 if (ice_transport_)
1899 ice_transport_->setDefaultRemoteAddress(compId_, addr);
1900}
1901
1902} // namespace jami