blob: 0bbe5bfd648ee6bff152e1c52bef32897d0a963f [file] [log] [blame]
Adrien Béraudefe27372023-05-27 18:56:29 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraudefe27372023-05-27 18:56:29 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraudefe27372023-05-27 18:56:29 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraudefe27372023-05-27 18:56:29 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraudefe27372023-05-27 18:56:29 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
Adrien Béraudefe27372023-05-27 18:56:29 -040017
18#include "connectionmanager.h"
19#include "multiplexed_socket.h"
20#include "test_runner.h"
Morteza Namvar82960b32023-07-04 17:08:22 -040021#include "certstore.h"
Adrien Béraudefe27372023-05-27 18:56:29 -040022
Adrien Béraud027af2a2023-08-27 12:08:50 -040023#include <opendht/log.h>
24#include <asio/executor_work_guard.hpp>
25#include <asio/io_context.hpp>
Adrien Béraud75754b22023-10-17 09:16:06 -040026#include <fmt/compile.h>
Adrien Béraud027af2a2023-08-27 12:08:50 -040027
28#include <cppunit/TestAssert.h>
29#include <cppunit/TestFixture.h>
30#include <cppunit/extensions/HelperMacros.h>
31
32#include <condition_variable>
33#include <iostream>
34#include <filesystem>
35
Adrien Béraudefe27372023-05-27 18:56:29 -040036using namespace std::literals::chrono_literals;
37
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraudefe27372023-05-27 18:56:29 -040039namespace test {
40
Amnab8c33bb2023-08-03 14:40:01 -040041struct ConnectionHandler
42{
Morteza Namvar82960b32023-07-04 17:08:22 -040043 dht::crypto::Identity id;
44 std::shared_ptr<Logger> logger;
45 std::shared_ptr<tls::CertificateStore> certStore;
46 std::shared_ptr<dht::DhtRunner> dht;
47 std::shared_ptr<ConnectionManager> connectionManager;
48 std::shared_ptr<asio::io_context> ioContext;
Amnab8c33bb2023-08-03 14:40:01 -040049 std::shared_ptr<std::thread> ioContextRunner;
Morteza Namvar82960b32023-07-04 17:08:22 -040050};
51
Adrien Béraudefe27372023-05-27 18:56:29 -040052class ConnectionManagerTest : public CppUnit::TestFixture
53{
54public:
Adrien Béraud4796de12023-09-25 14:46:47 -040055 ConnectionManagerTest() {
56 pj_log_set_level(0);
57 pj_log_set_log_func([](int level, const char* data, int /*len*/) {});
58 // logger->debug("Using PJSIP version {} for {}", pj_get_version(), PJ_OS_NAME);
59 // logger->debug("Using GnuTLS version {}", gnutls_check_version(nullptr));
60 // logger->debug("Using OpenDHT version {}", dht::version());
61 }
Amnab8c33bb2023-08-03 14:40:01 -040062 ~ConnectionManagerTest() {}
Adrien Béraudefe27372023-05-27 18:56:29 -040063 static std::string name() { return "ConnectionManager"; }
64 void setUp();
65 void tearDown();
66
Adrien Béraud4796de12023-09-25 14:46:47 -040067 std::shared_ptr<dht::DhtRunner> bootstrap_node;
68 dht::crypto::Identity org1Id, org2Id;
69 dht::crypto::Identity aliceId, bobId;
70 dht::crypto::Identity aliceDevice1Id, bobDevice1Id;
71
Morteza Namvar82960b32023-07-04 17:08:22 -040072 std::unique_ptr<ConnectionHandler> alice;
73 std::unique_ptr<ConnectionHandler> bob;
Adrien Béraudefe27372023-05-27 18:56:29 -040074
Morteza Namvar82960b32023-07-04 17:08:22 -040075 std::mutex mtx;
76 std::shared_ptr<asio::io_context> ioContext;
Amnab8c33bb2023-08-03 14:40:01 -040077 std::shared_ptr<std::thread> ioContextRunner;
Adrien Béraud4796de12023-09-25 14:46:47 -040078 std::shared_ptr<Logger> logger = dht::log::getStdLogger();
Amna81221ad2023-09-14 17:33:26 -040079 std::shared_ptr<IceTransportFactory> factory;
Morteza Namvar82960b32023-07-04 17:08:22 -040080
Amnab8c33bb2023-08-03 14:40:01 -040081private:
Adrien Béraud4796de12023-09-25 14:46:47 -040082 std::unique_ptr<ConnectionHandler> setupHandler(const dht::crypto::Identity& id, const std::string& bootstrap = "bootstrap.jami.net");
Morteza Namvar82960b32023-07-04 17:08:22 -040083
Amnab8c33bb2023-08-03 14:40:01 -040084 void testConnectDevice();
85 void testAcceptConnection();
Adrien Béraud8d787732023-10-16 12:58:17 -040086 void testManyChannels();
Amnab8c33bb2023-08-03 14:40:01 -040087 void testMultipleChannels();
88 void testMultipleChannelsOneDeclined();
89 void testMultipleChannelsSameName();
90 void testDeclineConnection();
91 void testSendReceiveData();
92 void testAcceptsICERequest();
93 void testDeclineICERequest();
94 void testChannelRcvShutdown();
95 void testChannelSenderShutdown();
96 void testCloseConnectionWith();
97 void testShutdownCallbacks();
98 void testFloodSocket();
99 void testDestroyWhileSending();
100 void testIsConnecting();
101 void testCanSendBeacon();
102 void testCannotSendBeacon();
103 void testConnectivityChangeTriggerBeacon();
104 void testOnNoBeaconTriggersShutdown();
105 void testShutdownWhileNegotiating();
106 void testGetChannelList();
Adrien Béraudefe27372023-05-27 18:56:29 -0400107 CPPUNIT_TEST_SUITE(ConnectionManagerTest);
Amnab8c33bb2023-08-03 14:40:01 -0400108 CPPUNIT_TEST(testDeclineICERequest);
109 CPPUNIT_TEST(testConnectDevice);
110 CPPUNIT_TEST(testIsConnecting);
111 CPPUNIT_TEST(testAcceptConnection);
112 CPPUNIT_TEST(testDeclineConnection);
Adrien Béraud8d787732023-10-16 12:58:17 -0400113 CPPUNIT_TEST(testManyChannels);
Amnab8c33bb2023-08-03 14:40:01 -0400114 CPPUNIT_TEST(testMultipleChannels);
115 CPPUNIT_TEST(testMultipleChannelsOneDeclined);
116 CPPUNIT_TEST(testMultipleChannelsSameName);
117 CPPUNIT_TEST(testSendReceiveData);
118 CPPUNIT_TEST(testAcceptsICERequest);
119 CPPUNIT_TEST(testChannelRcvShutdown);
120 CPPUNIT_TEST(testChannelSenderShutdown);
121 CPPUNIT_TEST(testCloseConnectionWith);
122 CPPUNIT_TEST(testShutdownCallbacks);
123 CPPUNIT_TEST(testFloodSocket);
124 CPPUNIT_TEST(testDestroyWhileSending);
125 CPPUNIT_TEST(testCanSendBeacon);
126 CPPUNIT_TEST(testCannotSendBeacon);
127 CPPUNIT_TEST(testConnectivityChangeTriggerBeacon);
128 CPPUNIT_TEST(testOnNoBeaconTriggersShutdown);
129 CPPUNIT_TEST(testShutdownWhileNegotiating);
130 CPPUNIT_TEST(testGetChannelList);
Adrien Béraudefe27372023-05-27 18:56:29 -0400131 CPPUNIT_TEST_SUITE_END();
132};
133
134CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(ConnectionManagerTest, ConnectionManagerTest::name());
135
Morteza Namvar82960b32023-07-04 17:08:22 -0400136std::unique_ptr<ConnectionHandler>
Adrien Béraud4796de12023-09-25 14:46:47 -0400137ConnectionManagerTest::setupHandler(const dht::crypto::Identity& id, const std::string& bootstrap)
Amnab8c33bb2023-08-03 14:40:01 -0400138{
Morteza Namvar82960b32023-07-04 17:08:22 -0400139 auto h = std::make_unique<ConnectionHandler>();
Adrien Béraud4796de12023-09-25 14:46:47 -0400140 h->id = id;
141 h->logger = {};//logger;
142 h->certStore = std::make_shared<tls::CertificateStore>(id.second->getName(), nullptr/*h->logger*/);
Amnab8c33bb2023-08-03 14:40:01 -0400143 h->ioContext = ioContext;
Amnab8c33bb2023-08-03 14:40:01 -0400144 h->ioContextRunner = ioContextRunner;
Morteza Namvar82960b32023-07-04 17:08:22 -0400145
146 dht::DhtRunner::Config dhtConfig;
147 dhtConfig.dht_config.id = h->id;
148 dhtConfig.threaded = true;
149
150 dht::DhtRunner::Context dhtContext;
Amnab8c33bb2023-08-03 14:40:01 -0400151 dhtContext.certificateStore = [c = h->certStore](const dht::InfoHash& pk_id) {
Morteza Namvar82960b32023-07-04 17:08:22 -0400152 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
153 if (auto cert = c->getCertificate(pk_id.toString()))
154 ret.emplace_back(std::move(cert));
155 return ret;
156 };
Amnab8c33bb2023-08-03 14:40:01 -0400157 // dhtContext.logger = h->logger;
Morteza Namvar82960b32023-07-04 17:08:22 -0400158
159 h->dht = std::make_shared<dht::DhtRunner>();
160 h->dht->run(dhtConfig, std::move(dhtContext));
Adrien Béraud4796de12023-09-25 14:46:47 -0400161 h->dht->bootstrap(bootstrap);
Morteza Namvar82960b32023-07-04 17:08:22 -0400162
163 auto config = std::make_shared<ConnectionManager::Config>();
164 config->dht = h->dht;
165 config->id = h->id;
166 config->ioContext = h->ioContext;
Amna81221ad2023-09-14 17:33:26 -0400167 config->factory = factory;
Adrien Béraud4796de12023-09-25 14:46:47 -0400168 // config->logger = logger;
Amna81221ad2023-09-14 17:33:26 -0400169 config->certStore = h->certStore;
Adrien Béraud4796de12023-09-25 14:46:47 -0400170 config->cachePath = std::filesystem::current_path() / id.second->getName() / "temp";
Morteza Namvar82960b32023-07-04 17:08:22 -0400171
172 h->connectionManager = std::make_shared<ConnectionManager>(config);
Amnab8c33bb2023-08-03 14:40:01 -0400173 h->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraud4796de12023-09-25 14:46:47 -0400174 h->connectionManager->onDhtConnected(h->id.first->getPublicKey());
175
Morteza Namvar82960b32023-07-04 17:08:22 -0400176 return h;
177}
178
Adrien Béraudefe27372023-05-27 18:56:29 -0400179void
180ConnectionManagerTest::setUp()
181{
Adrien Béraud4796de12023-09-25 14:46:47 -0400182 if (not org1Id.first) {
183 org1Id = dht::crypto::generateIdentity("org1");
184 org2Id = dht::crypto::generateIdentity("org2");
185 aliceId = dht::crypto::generateIdentity("alice", org1Id, 2048, true);
186 bobId = dht::crypto::generateIdentity("bob", org2Id, 2048, true);
187 aliceDevice1Id = dht::crypto::generateIdentity("aliceDevice1", aliceId);
188 bobDevice1Id = dht::crypto::generateIdentity("bobDevice1", bobId);
189 }
Adrien Béraudc631a832023-07-26 22:19:00 -0400190
Morteza Namvar82960b32023-07-04 17:08:22 -0400191 ioContext = std::make_shared<asio::io_context>();
Amnab8c33bb2023-08-03 14:40:01 -0400192 ioContextRunner = std::make_shared<std::thread>([context = ioContext]() {
Morteza Namvar82960b32023-07-04 17:08:22 -0400193 try {
194 auto work = asio::make_work_guard(*context);
195 context->run();
196 } catch (const std::exception& ex) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400197 fmt::print("Exception in ioContextRunner: {}\n", ex.what());
Morteza Namvar82960b32023-07-04 17:08:22 -0400198 }
199 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400200 bootstrap_node = std::make_shared<dht::DhtRunner>();
201 bootstrap_node->run(36432);
202
203 factory = std::make_unique<IceTransportFactory>(/*logger*/);
204 alice = setupHandler(aliceDevice1Id, "127.0.0.1:36432");
205 bob = setupHandler(bobDevice1Id, "127.0.0.1:36432");
Adrien Béraudefe27372023-05-27 18:56:29 -0400206}
207
208void
209ConnectionManagerTest::tearDown()
210{
Amnab8c33bb2023-08-03 14:40:01 -0400211 // wait_for_removal_of({aliceId, bobId});
212 // Stop the io_context and join the ioContextRunner thread
Morteza Namvar82960b32023-07-04 17:08:22 -0400213 ioContext->stop();
Amnab8c33bb2023-08-03 14:40:01 -0400214
215 if (ioContextRunner && ioContextRunner->joinable()) {
216 ioContextRunner->join();
217 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400218
219 bootstrap_node.reset();
220 alice.reset();
221 bob.reset();
222 factory.reset();
Adrien Béraudefe27372023-05-27 18:56:29 -0400223}
Adrien Béraudefe27372023-05-27 18:56:29 -0400224void
Morteza Namvar82960b32023-07-04 17:08:22 -0400225ConnectionManagerTest::testConnectDevice()
Adrien Béraudefe27372023-05-27 18:56:29 -0400226{
Morteza Namvar82960b32023-07-04 17:08:22 -0400227 std::condition_variable bobConVar;
228 bool isBobRecvChanlReq = false;
Amnab8c33bb2023-08-03 14:40:01 -0400229 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400230 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400231 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400232 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400233 isBobRecvChanlReq = name == "dumyName";
234 bobConVar.notify_one();
235 return true;
236 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400237
Morteza Namvar82960b32023-07-04 17:08:22 -0400238 std::condition_variable alicConVar;
239 bool isAlicConnected = false;
Adrien Béraud4796de12023-09-25 14:46:47 -0400240 alice->connectionManager->connectDevice(bob->id.second, "dumyName", [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
241 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400242 if (socket) {
243 isAlicConnected = true;
244 }
Morteza Namvar82960b32023-07-04 17:08:22 -0400245 alicConVar.notify_one();
Adrien Béraud4796de12023-09-25 14:46:47 -0400246 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400247
Adrien Béraud4796de12023-09-25 14:46:47 -0400248 std::unique_lock<std::mutex> lock {mtx};
249 CPPUNIT_ASSERT(bobConVar.wait_for(lock, 30s, [&] { return isBobRecvChanlReq; }));
250 CPPUNIT_ASSERT(alicConVar.wait_for(lock, 30s, [&] { return isAlicConnected; }));
Adrien Béraudefe27372023-05-27 18:56:29 -0400251}
252
Amnab8c33bb2023-08-03 14:40:01 -0400253void
254ConnectionManagerTest::testAcceptConnection()
255{
Amnab8c33bb2023-08-03 14:40:01 -0400256 std::unique_lock<std::mutex> lk {mtx};
257 std::condition_variable cv;
258 bool successfullyConnected = false;
259 bool successfullyReceive = false;
260 bool receiverConnected = false;
Adrien Béraudefe27372023-05-27 18:56:29 -0400261
Amnab8c33bb2023-08-03 14:40:01 -0400262 bob->connectionManager->onChannelRequest(
263 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
264 const std::string& name) {
265 successfullyReceive = name == "git://*";
266 return true;
267 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400268
Amnab8c33bb2023-08-03 14:40:01 -0400269 bob->connectionManager->onConnectionReady(
270 [&receiverConnected](const DeviceId&,
271 const std::string& name,
272 std::shared_ptr<ChannelSocket> socket) {
273 receiverConnected = socket && (name == "git://*");
274 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400275
Amnab8c33bb2023-08-03 14:40:01 -0400276 alice->connectionManager->connectDevice(bob->id.second,
277 "git://*",
278 [&](std::shared_ptr<ChannelSocket> socket,
279 const DeviceId&) {
280 if (socket) {
281 successfullyConnected = true;
282 }
283 cv.notify_one();
284 });
285 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
286 return successfullyReceive && successfullyConnected && receiverConnected;
287 }));
288}
Adrien Béraudefe27372023-05-27 18:56:29 -0400289
Amnab8c33bb2023-08-03 14:40:01 -0400290void
291ConnectionManagerTest::testDeclineConnection()
292{
Amnab8c33bb2023-08-03 14:40:01 -0400293 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
294 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400295
Amnab8c33bb2023-08-03 14:40:01 -0400296 std::unique_lock<std::mutex> lk {mtx};
297 std::condition_variable cv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400298 bool connectCompleted = false;
Amnab8c33bb2023-08-03 14:40:01 -0400299 bool successfullyConnected = false;
300 bool successfullyReceive = false;
301 bool receiverConnected = false;
Adrien Béraudefe27372023-05-27 18:56:29 -0400302
Amnab8c33bb2023-08-03 14:40:01 -0400303 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400304 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400305 const std::string&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400306 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400307 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400308 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400309 return false;
310 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400311
Amnab8c33bb2023-08-03 14:40:01 -0400312 bob->connectionManager->onConnectionReady(
313 [&receiverConnected](const DeviceId&,
314 const std::string&,
315 std::shared_ptr<ChannelSocket> socket) {
316 if (socket)
317 receiverConnected = true;
318 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400319
Amnab8c33bb2023-08-03 14:40:01 -0400320 alice->connectionManager->connectDevice(bob->id.second,
321 "git://*",
322 [&](std::shared_ptr<ChannelSocket> socket,
323 const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400324 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400325 if (socket) {
326 successfullyConnected = true;
327 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400328 connectCompleted = true;
Amnab8c33bb2023-08-03 14:40:01 -0400329 cv.notify_one();
330 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400331 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; }));
332 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return connectCompleted; }));
Amnab8c33bb2023-08-03 14:40:01 -0400333 CPPUNIT_ASSERT(!successfullyConnected);
334 CPPUNIT_ASSERT(!receiverConnected);
335}
Adrien Béraudefe27372023-05-27 18:56:29 -0400336
Adrien Béraud8d787732023-10-16 12:58:17 -0400337
338void
339ConnectionManagerTest::testManyChannels()
340{
341 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
342 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
343
344 std::condition_variable cv;
345 size_t successfullyConnected = 0;
346 size_t accepted = 0;
347 size_t receiverConnected = 0;
348 size_t successfullyReceived = 0;
349 size_t shutdownCount = 0;
350
351 auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
352 if (name.empty()) return false;
353 std::lock_guard<std::mutex> lk {mtx};
354 accepted++;
355 cv.notify_one();
356 return true;
357 };
358 bob->connectionManager->onChannelRequest(acceptAll);
359 alice->connectionManager->onChannelRequest(acceptAll);
360
361 auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
362 if (not socket or name.empty()) return;
363 if (socket->isInitiator())
364 return;
365 socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) {
366 rxbuf->insert(rxbuf->end(), data, data + size);
367 if (rxbuf->size() == 32) {
368 if (auto socket = w.lock()) {
369 std::error_code ec;
370 socket->write(rxbuf->data(), rxbuf->size(), ec);
371 CPPUNIT_ASSERT(!ec);
372 socket->shutdown();
373 }
374 }
375 return size;
376 });
377 std::lock_guard<std::mutex> lk {mtx};
378 receiverConnected++;
379 cv.notify_one();
380 };
381 bob->connectionManager->onConnectionReady(onReady);
382 alice->connectionManager->onConnectionReady(onReady);
383
384 // max supported number of channels per side (64k - 2 reserved channels)
385 static constexpr size_t N = 1024 * 32 - 1;
386
387 auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
388 CPPUNIT_ASSERT(socket);
389 if (socket) {
390 std::lock_guard<std::mutex> lk {mtx};
391 successfullyConnected++;
392 cv.notify_one();
393 }
394 auto data_sent = dht::PkId::get(socket->name());
395 socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) {
396 rxbuf->insert(rxbuf->end(), data, data + size);
397 if (rxbuf->size() == 32) {
398 CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size()));
399 std::lock_guard<std::mutex> lk {mtx};
400 successfullyReceived++;
401 cv.notify_one();
402 }
403 return size;
404 });
405 socket->onShutdown([&]() {
406 std::lock_guard<std::mutex> lk {mtx};
407 shutdownCount++;
408 cv.notify_one();
409 });
410 std::error_code ec;
411 socket->write(data_sent.data(), data_sent.size(), ec);
412 CPPUNIT_ASSERT(!ec);
413 };
414
415 for (size_t i = 0; i < N; ++i) {
416 alice->connectionManager->connectDevice(bob->id.second,
417 fmt::format("git://{}", i+1),
418 onConnect);
419
420 bob->connectionManager->connectDevice(alice->id.second,
421 fmt::format("sip://{}", i+1),
422 onConnect);
423
424 if (i % 128 == 0)
425 std::this_thread::sleep_for(5ms);
426 }
427
428 std::unique_lock<std::mutex> lk {mtx};
429 cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; });
430 CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected);
431 cv.wait_for(lk, 30s, [&] { return accepted == N * 2; });
432 CPPUNIT_ASSERT_EQUAL(N * 2, accepted);
433 cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; });
434 CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected);
435 cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; });
436 CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived);
437 cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; });
438 CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount);
439 lk.unlock();
440
441 // Wait a bit to let at least some channels shutdown
442 std::this_thread::sleep_for(10ms);
443
444 // Second time to make sure we can re-use the channels after shutdown
445 for (size_t i = 0; i < N; ++i) {
446 alice->connectionManager->connectDevice(bob->id.second,
447 fmt::format("git://{}", N+i+1),
448 onConnect);
449
450 bob->connectionManager->connectDevice(alice->id.second,
451 fmt::format("sip://{}", N+i+1),
452 onConnect);
453
454 if (i % 128 == 0)
455 std::this_thread::sleep_for(5ms);
456 }
457
458 lk.lock();
459 cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; });
460 CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected);
461 cv.wait_for(lk, 30s, [&] { return accepted == N * 4; });
462 CPPUNIT_ASSERT_EQUAL(N * 4, accepted);
463 cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; });
464 CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected);
465 cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; });
466 CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived);
467 cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; });
468 CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount);
469}
470
Amnab8c33bb2023-08-03 14:40:01 -0400471void
472ConnectionManagerTest::testMultipleChannels()
473{
Amnab8c33bb2023-08-03 14:40:01 -0400474 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
475 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400476
Amnab8c33bb2023-08-03 14:40:01 -0400477 std::condition_variable cv;
478 bool successfullyConnected = false;
479 bool successfullyConnected2 = false;
480 int receiverConnected = 0;
Adrien Béraudefe27372023-05-27 18:56:29 -0400481
Amnab8c33bb2023-08-03 14:40:01 -0400482 bob->connectionManager->onChannelRequest(
483 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400484
Amnab8c33bb2023-08-03 14:40:01 -0400485 bob->connectionManager->onConnectionReady(
Adrien Béraud4796de12023-09-25 14:46:47 -0400486 [&](const DeviceId&, const std::string& name,
Amnab8c33bb2023-08-03 14:40:01 -0400487 std::shared_ptr<ChannelSocket> socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400488 if (not name.empty()) {
489 std::lock_guard<std::mutex> lk {mtx};
490 if (socket)
491 receiverConnected += 1;
492 cv.notify_one();
493 }
Amnab8c33bb2023-08-03 14:40:01 -0400494 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400495
Amnab8c33bb2023-08-03 14:40:01 -0400496 alice->connectionManager->connectDevice(bob->id.second,
497 "git://*",
498 [&](std::shared_ptr<ChannelSocket> socket,
499 const DeviceId&) {
500 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400501 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400502 successfullyConnected = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400503 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400504 }
Amnab8c33bb2023-08-03 14:40:01 -0400505 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400506
Amnab8c33bb2023-08-03 14:40:01 -0400507 alice->connectionManager->connectDevice(bob->id.second,
508 "sip://*",
509 [&](std::shared_ptr<ChannelSocket> socket,
510 const DeviceId&) {
511 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400512 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400513 successfullyConnected2 = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400514 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400515 }
Amnab8c33bb2023-08-03 14:40:01 -0400516 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400517
Adrien Béraud4796de12023-09-25 14:46:47 -0400518 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400519 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
520 return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
521 }));
522 CPPUNIT_ASSERT(alice->connectionManager->activeSockets() == 1);
523}
Adrien Béraudefe27372023-05-27 18:56:29 -0400524
Amnab8c33bb2023-08-03 14:40:01 -0400525void
526ConnectionManagerTest::testMultipleChannelsOneDeclined()
527{
Amnab8c33bb2023-08-03 14:40:01 -0400528 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
529 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400530
Amnab8c33bb2023-08-03 14:40:01 -0400531 std::unique_lock<std::mutex> lk {mtx};
532 std::condition_variable cv;
533 bool successfullyNotConnected = false;
534 bool successfullyConnected2 = false;
535 int receiverConnected = 0;
Adrien Béraudefe27372023-05-27 18:56:29 -0400536
Amnab8c33bb2023-08-03 14:40:01 -0400537 bob->connectionManager->onChannelRequest(
538 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
539 if (name == "git://*")
540 return false;
541 return true;
542 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400543
Amnab8c33bb2023-08-03 14:40:01 -0400544 bob->connectionManager->onConnectionReady(
545 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
546 if (socket)
547 receiverConnected += 1;
548 cv.notify_one();
549 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400550
Amnab8c33bb2023-08-03 14:40:01 -0400551 alice->connectionManager->connectDevice(bob->id.second,
552 "git://*",
553 [&](std::shared_ptr<ChannelSocket> socket,
554 const DeviceId&) {
555 if (!socket)
556 successfullyNotConnected = true;
557 cv.notify_one();
558 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400559
Amnab8c33bb2023-08-03 14:40:01 -0400560 alice->connectionManager->connectDevice(bob->id.second,
561 "sip://*",
562 [&](std::shared_ptr<ChannelSocket> socket,
563 const DeviceId&) {
564 if (socket)
565 successfullyConnected2 = true;
566 cv.notify_one();
567 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400568
Amnab8c33bb2023-08-03 14:40:01 -0400569 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
570 return successfullyNotConnected && successfullyConnected2 && receiverConnected == 1;
571 }));
572 CPPUNIT_ASSERT(alice->connectionManager->activeSockets() == 1);
573}
Morteza Namvar82960b32023-07-04 17:08:22 -0400574
Amnab8c33bb2023-08-03 14:40:01 -0400575void
576ConnectionManagerTest::testMultipleChannelsSameName()
577{
Amnab8c33bb2023-08-03 14:40:01 -0400578 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
579 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400580
Amnab8c33bb2023-08-03 14:40:01 -0400581 std::unique_lock<std::mutex> lk {mtx};
582 std::condition_variable cv;
583 bool successfullyConnected = false;
584 bool successfullyConnected2 = false;
585 int receiverConnected = 0;
Morteza Namvar82960b32023-07-04 17:08:22 -0400586
Amnab8c33bb2023-08-03 14:40:01 -0400587 bob->connectionManager->onChannelRequest(
588 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400589
Amnab8c33bb2023-08-03 14:40:01 -0400590 bob->connectionManager->onConnectionReady(
591 [&receiverConnected](const DeviceId&,
592 const std::string&,
593 std::shared_ptr<ChannelSocket> socket) {
594 if (socket)
595 receiverConnected += 1;
596 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400597
Amnab8c33bb2023-08-03 14:40:01 -0400598 alice->connectionManager->connectDevice(bob->id.second,
599 "git://*",
600 [&](std::shared_ptr<ChannelSocket> socket,
601 const DeviceId&) {
602 if (socket) {
603 successfullyConnected = true;
604 }
605 cv.notify_one();
606 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400607
Amnab8c33bb2023-08-03 14:40:01 -0400608 // We can open two sockets with the same name, it will be two different channel
609 alice->connectionManager->connectDevice(bob->id.second,
610 "git://*",
611 [&](std::shared_ptr<ChannelSocket> socket,
612 const DeviceId&) {
613 if (socket) {
614 successfullyConnected2 = true;
615 }
616 cv.notify_one();
617 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400618
Amnab8c33bb2023-08-03 14:40:01 -0400619 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
620 return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
621 }));
622}
Morteza Namvar82960b32023-07-04 17:08:22 -0400623
Amnab8c33bb2023-08-03 14:40:01 -0400624void
625ConnectionManagerTest::testSendReceiveData()
626{
Amnab8c33bb2023-08-03 14:40:01 -0400627 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
628 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400629
Amnab8c33bb2023-08-03 14:40:01 -0400630 std::unique_lock<std::mutex> lk {mtx};
631 std::condition_variable cv;
632 std::atomic_int events(0);
633 bool successfullyConnected = false, successfullyConnected2 = false, successfullyReceive = false,
634 receiverConnected = false;
635 const uint8_t buf_other[] = {0x64, 0x65, 0x66, 0x67};
636 const uint8_t buf_test[] = {0x68, 0x69, 0x70, 0x71};
637 bool dataOk = false, dataOk2 = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400638
Amnab8c33bb2023-08-03 14:40:01 -0400639 bob->connectionManager->onChannelRequest(
640 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
641 const std::string&) {
642 successfullyReceive = true;
643 return true;
644 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400645
Amnab8c33bb2023-08-03 14:40:01 -0400646 bob->connectionManager->onConnectionReady(
647 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
648 if (socket && (name == "test" || name == "other")) {
649 receiverConnected = true;
650 std::error_code ec;
651 auto res = socket->waitForData(std::chrono::milliseconds(5000), ec);
652 if (res == 4) {
653 uint8_t buf[4];
654 socket->read(&buf[0], 4, ec);
655 if (name == "test")
656 dataOk = std::equal(std::begin(buf), std::end(buf), std::begin(buf_test));
657 else
658 dataOk2 = std::equal(std::begin(buf), std::end(buf), std::begin(buf_other));
659 events++;
660 cv.notify_one();
661 }
662 }
663 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400664
Amnab8c33bb2023-08-03 14:40:01 -0400665 alice->connectionManager->connectDevice(bob->id.second,
666 "test",
667 [&](std::shared_ptr<ChannelSocket> socket,
668 const DeviceId&) {
669 if (socket) {
670 successfullyConnected = true;
671 std::error_code ec;
672 socket->write(&buf_test[0], 4, ec);
673 }
674 events++;
675 cv.notify_one();
676 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400677
Amnab8c33bb2023-08-03 14:40:01 -0400678 alice->connectionManager->connectDevice(bob->id.second,
679 "other",
680 [&](std::shared_ptr<ChannelSocket> socket,
681 const DeviceId&) {
682 if (socket) {
683 successfullyConnected2 = true;
684 std::error_code ec;
685 socket->write(&buf_other[0], 4, ec);
686 }
687 events++;
688 cv.notify_one();
689 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400690
Amnab8c33bb2023-08-03 14:40:01 -0400691 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
692 return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2
693 && dataOk && dataOk2;
694 }));
695}
Morteza Namvar82960b32023-07-04 17:08:22 -0400696
Amnab8c33bb2023-08-03 14:40:01 -0400697void
698ConnectionManagerTest::testAcceptsICERequest()
699{
Amnab8c33bb2023-08-03 14:40:01 -0400700 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400701
Amnab8c33bb2023-08-03 14:40:01 -0400702 std::unique_lock<std::mutex> lk {mtx};
703 std::condition_variable cv;
704 bool successfullyConnected = false;
705 bool successfullyReceive = false;
706 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400707
Amnab8c33bb2023-08-03 14:40:01 -0400708 bob->connectionManager->onChannelRequest(
709 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
710 bob->connectionManager->onICERequest([&](const DeviceId&) {
711 successfullyReceive = true;
712 return true;
713 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400714
Amnab8c33bb2023-08-03 14:40:01 -0400715 bob->connectionManager->onConnectionReady(
716 [&receiverConnected](const DeviceId&,
717 const std::string& name,
718 std::shared_ptr<ChannelSocket> socket) {
719 receiverConnected = socket && (name == "git://*");
720 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400721
Amnab8c33bb2023-08-03 14:40:01 -0400722 alice->connectionManager->connectDevice(bob->id.second,
723 "git://*",
724 [&](std::shared_ptr<ChannelSocket> socket,
725 const DeviceId&) {
726 if (socket) {
727 successfullyConnected = true;
728 }
729 cv.notify_one();
730 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400731
Amnab8c33bb2023-08-03 14:40:01 -0400732 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
733 return successfullyReceive && successfullyConnected && receiverConnected;
734 }));
735}
Morteza Namvar82960b32023-07-04 17:08:22 -0400736
Amnab8c33bb2023-08-03 14:40:01 -0400737void
738ConnectionManagerTest::testDeclineICERequest()
739{
Amnab8c33bb2023-08-03 14:40:01 -0400740 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400741
Amnab8c33bb2023-08-03 14:40:01 -0400742 std::condition_variable cv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400743 bool connectCompleted = false;
Amnab8c33bb2023-08-03 14:40:01 -0400744 bool successfullyConnected = false;
745 bool successfullyReceive = false;
746 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400747
Amnab8c33bb2023-08-03 14:40:01 -0400748 bob->connectionManager->onChannelRequest(
749 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
750 bob->connectionManager->onICERequest([&](const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400751 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400752 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400753 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400754 return false;
755 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400756
Amnab8c33bb2023-08-03 14:40:01 -0400757 bob->connectionManager->onConnectionReady(
758 [&receiverConnected](const DeviceId&,
759 const std::string& name,
760 std::shared_ptr<ChannelSocket> socket) {
761 receiverConnected = socket && (name == "git://*");
762 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400763
Amnab8c33bb2023-08-03 14:40:01 -0400764 alice->connectionManager->connectDevice(bob->id.second,
765 "git://*",
766 [&](std::shared_ptr<ChannelSocket> socket,
767 const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400768 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400769 if (socket) {
770 successfullyConnected = true;
771 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400772 connectCompleted = true;
Amnab8c33bb2023-08-03 14:40:01 -0400773 cv.notify_one();
774 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400775
Adrien Béraud4796de12023-09-25 14:46:47 -0400776 std::unique_lock<std::mutex> lk {mtx};
777 CPPUNIT_ASSERT(cv.wait_for(lk, 35s, [&] { return successfullyReceive; }));
778 CPPUNIT_ASSERT(cv.wait_for(lk, 35s, [&] { return connectCompleted; }));
Amnab8c33bb2023-08-03 14:40:01 -0400779 CPPUNIT_ASSERT(!receiverConnected);
780 CPPUNIT_ASSERT(!successfullyConnected);
781}
Morteza Namvar82960b32023-07-04 17:08:22 -0400782
Amnab8c33bb2023-08-03 14:40:01 -0400783void
784ConnectionManagerTest::testChannelRcvShutdown()
785{
Amnab8c33bb2023-08-03 14:40:01 -0400786 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
787 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400788
Amnab8c33bb2023-08-03 14:40:01 -0400789 std::unique_lock<std::mutex> lk {mtx};
790 std::condition_variable cv;
791 bool successfullyConnected = false;
792 bool shutdownReceived = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400793
Amnab8c33bb2023-08-03 14:40:01 -0400794 std::shared_ptr<ChannelSocket> bobSock;
Morteza Namvar82960b32023-07-04 17:08:22 -0400795
Amnab8c33bb2023-08-03 14:40:01 -0400796 bob->connectionManager->onChannelRequest(
797 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400798
Amnab8c33bb2023-08-03 14:40:01 -0400799 bob->connectionManager->onConnectionReady(
800 [&](const DeviceId& did, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
801 if (socket && name == "git://*" && did != bob->id.second->getLongId()) {
802 bobSock = socket;
803 cv.notify_one();
804 }
805 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400806
Amnab8c33bb2023-08-03 14:40:01 -0400807 alice->connectionManager->connectDevice(bob->id.second,
808 "git://*",
809 [&](std::shared_ptr<ChannelSocket> socket,
810 const DeviceId&) {
811 if (socket) {
812 socket->onShutdown([&] {
813 shutdownReceived = true;
814 cv.notify_one();
815 });
816 successfullyConnected = true;
817 cv.notify_one();
818 }
819 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400820
Amnab8c33bb2023-08-03 14:40:01 -0400821 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return bobSock && successfullyConnected; }));
Morteza Namvar82960b32023-07-04 17:08:22 -0400822
Amnab8c33bb2023-08-03 14:40:01 -0400823 bobSock->shutdown();
Morteza Namvar82960b32023-07-04 17:08:22 -0400824
Amnab8c33bb2023-08-03 14:40:01 -0400825 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shutdownReceived; }));
826}
Morteza Namvar82960b32023-07-04 17:08:22 -0400827
Amnab8c33bb2023-08-03 14:40:01 -0400828void
829ConnectionManagerTest::testChannelSenderShutdown()
830{
Amnab8c33bb2023-08-03 14:40:01 -0400831 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
832 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400833
Amnab8c33bb2023-08-03 14:40:01 -0400834 std::condition_variable rcv, scv;
835 bool successfullyConnected = false;
836 bool successfullyReceive = false;
837 bool receiverConnected = false;
838 bool shutdownReceived = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400839
Amnab8c33bb2023-08-03 14:40:01 -0400840 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400841 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400842 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400843 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400844 successfullyReceive = name == "git://*";
Adrien Béraud4796de12023-09-25 14:46:47 -0400845 rcv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400846 return true;
847 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400848
Amnab8c33bb2023-08-03 14:40:01 -0400849 bob->connectionManager->onConnectionReady(
850 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
851 if (socket) {
852 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400853 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400854 shutdownReceived = true;
855 scv.notify_one();
856 });
857 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400858 if (not name.empty()) {
859 std::lock_guard<std::mutex> lk {mtx};
860 receiverConnected = socket && (name == "git://*");
861 rcv.notify_one();
862 }
Amnab8c33bb2023-08-03 14:40:01 -0400863 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400864
Amnab8c33bb2023-08-03 14:40:01 -0400865 alice->connectionManager->connectDevice(bob->id.second,
866 "git://*",
867 [&](std::shared_ptr<ChannelSocket> socket,
868 const DeviceId&) {
869 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400870 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400871 successfullyConnected = true;
872 rcv.notify_one();
873 socket->shutdown();
874 }
875 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400876
Adrien Béraud4796de12023-09-25 14:46:47 -0400877 std::unique_lock<std::mutex> lk {mtx};
878 rcv.wait_for(lk, 30s, [&] { return successfullyConnected && successfullyReceive && receiverConnected; });
879 scv.wait_for(lk, 30s, [&] { return shutdownReceived; });
Amnab8c33bb2023-08-03 14:40:01 -0400880}
Morteza Namvar82960b32023-07-04 17:08:22 -0400881
Amnab8c33bb2023-08-03 14:40:01 -0400882void
883ConnectionManagerTest::testCloseConnectionWith()
884{
Amnab8c33bb2023-08-03 14:40:01 -0400885 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
886 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400887
Amnab8c33bb2023-08-03 14:40:01 -0400888 auto bobUri = bob->id.second->issuer->getId().toString();
Amnab8c33bb2023-08-03 14:40:01 -0400889 std::condition_variable rcv, scv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400890 unsigned events(0);
Amnab8c33bb2023-08-03 14:40:01 -0400891 bool successfullyConnected = false;
892 bool successfullyReceive = false;
893 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400894
Amnab8c33bb2023-08-03 14:40:01 -0400895 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400896 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400897 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400898 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400899 successfullyReceive = name == "git://*";
900 return true;
901 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400902
Amnab8c33bb2023-08-03 14:40:01 -0400903 bob->connectionManager->onConnectionReady([&](const DeviceId&,
904 const std::string& name,
905 std::shared_ptr<dhtnet::ChannelSocket> socket) {
906 if (socket) {
907 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400908 std::lock_guard<std::mutex> lk {mtx};
909 events++;
Amnab8c33bb2023-08-03 14:40:01 -0400910 scv.notify_one();
911 });
912 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400913 if (not name.empty()) {
914 std::lock_guard<std::mutex> lk {mtx};
915 receiverConnected = socket && (name == "git://*");
916 rcv.notify_one();
917 }
Amnab8c33bb2023-08-03 14:40:01 -0400918 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400919
Adrien Béraud4796de12023-09-25 14:46:47 -0400920 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -0400921 "git://*",
922 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -0400923 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -0400924 if (socket) {
925 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400926 std::lock_guard<std::mutex> lk {mtx};
927 events++;
Amnab8c33bb2023-08-03 14:40:01 -0400928 scv.notify_one();
929 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400930 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400931 successfullyConnected = true;
932 rcv.notify_one();
933 }
934 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400935
Adrien Béraud4796de12023-09-25 14:46:47 -0400936 {
937 std::unique_lock<std::mutex> lk {mtx};
938 rcv.wait_for(lk, 30s, [&] {
939 return successfullyReceive && successfullyConnected && receiverConnected;
940 });
941 }
942 std::this_thread::sleep_for(1s);
Amnab8c33bb2023-08-03 14:40:01 -0400943 // This should trigger onShutdown
944 alice->connectionManager->closeConnectionsWith(bobUri);
Adrien Béraud4796de12023-09-25 14:46:47 -0400945 std::unique_lock<std::mutex> lk {mtx};
946 CPPUNIT_ASSERT(scv.wait_for(lk, 10s, [&] { return events == 2; }));
Amnab8c33bb2023-08-03 14:40:01 -0400947}
Morteza Namvar82960b32023-07-04 17:08:22 -0400948
Amnab8c33bb2023-08-03 14:40:01 -0400949// explain algorithm
950void
951ConnectionManagerTest::testShutdownCallbacks()
952{
Amnab8c33bb2023-08-03 14:40:01 -0400953 auto aliceUri = alice->id.second->issuer->getId().toString();
Morteza Namvar82960b32023-07-04 17:08:22 -0400954
Amnab8c33bb2023-08-03 14:40:01 -0400955 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
956 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400957
Amnab8c33bb2023-08-03 14:40:01 -0400958 std::condition_variable rcv, chan2cv;
959 bool successfullyConnected = false;
960 bool successfullyReceive = false;
961 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400962
Amnab8c33bb2023-08-03 14:40:01 -0400963 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400964 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
Amnab8c33bb2023-08-03 14:40:01 -0400965 if (name == "1") {
Adrien Béraud4796de12023-09-25 14:46:47 -0400966 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400967 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400968 rcv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400969 } else {
970 chan2cv.notify_one();
971 // Do not return directly. Let the connection be closed
972 std::this_thread::sleep_for(10s);
973 }
974 return true;
975 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400976
Amnab8c33bb2023-08-03 14:40:01 -0400977 bob->connectionManager->onConnectionReady([&](const DeviceId&,
978 const std::string& name,
979 std::shared_ptr<dhtnet::ChannelSocket> socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400980 if (name == "1") {
981 std::unique_lock<std::mutex> lk {mtx};
982 receiverConnected = (bool)socket;
983 rcv.notify_one();
984 }
Amnab8c33bb2023-08-03 14:40:01 -0400985 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400986
Adrien Béraud4796de12023-09-25 14:46:47 -0400987 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -0400988 "1",
989 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -0400990 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -0400991 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400992 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400993 successfullyConnected = true;
994 rcv.notify_one();
995 }
996 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400997
998 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400999 // Connect first channel. This will initiate a mx sock
1000 CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] {
Amnab8c33bb2023-08-03 14:40:01 -04001001 return successfullyReceive && successfullyConnected && receiverConnected;
1002 }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001003
Amnab8c33bb2023-08-03 14:40:01 -04001004 // Connect another channel, but close the connection
1005 bool channel2NotConnected = false;
Adrien Béraud4796de12023-09-25 14:46:47 -04001006 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -04001007 "2",
1008 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -04001009 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -04001010 channel2NotConnected = !socket;
1011 rcv.notify_one();
1012 });
1013 chan2cv.wait_for(lk, 30s);
Morteza Namvar82960b32023-07-04 17:08:22 -04001014
Amnab8c33bb2023-08-03 14:40:01 -04001015 // This should trigger onShutdown for second callback
1016 bob->connectionManager->closeConnectionsWith(aliceUri);
1017 CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; }));
1018}
Morteza Namvar82960b32023-07-04 17:08:22 -04001019
Amnab8c33bb2023-08-03 14:40:01 -04001020void
1021ConnectionManagerTest::testFloodSocket()
1022{
Amnab8c33bb2023-08-03 14:40:01 -04001023 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1024 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001025
Amnab8c33bb2023-08-03 14:40:01 -04001026 std::condition_variable cv;
1027 bool successfullyConnected = false;
1028 bool successfullyReceive = false;
1029 bool receiverConnected = false;
1030 std::shared_ptr<dhtnet::ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2,
1031 sendSock3;
1032 bob->connectionManager->onChannelRequest(
1033 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
1034 const std::string& name) {
1035 successfullyReceive = name == "1";
1036 return true;
1037 });
1038 bob->connectionManager->onConnectionReady([&](const DeviceId&,
1039 const std::string& name,
1040 std::shared_ptr<dhtnet::ChannelSocket> socket) {
1041 receiverConnected = socket != nullptr;
1042 if (name == "1")
1043 rcvSock1 = socket;
1044 else if (name == "2")
1045 rcvSock2 = socket;
1046 else if (name == "3")
1047 rcvSock3 = socket;
1048 });
1049 alice->connectionManager->connectDevice(bob->id.second,
1050 "1",
1051 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1052 const DeviceId&) {
1053 if (socket) {
1054 sendSock = socket;
1055 successfullyConnected = true;
1056 }
1057 cv.notify_one();
1058 });
Adrien Béraud4796de12023-09-25 14:46:47 -04001059 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001060 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
1061 return successfullyReceive && successfullyConnected && receiverConnected;
1062 }));
1063 CPPUNIT_ASSERT(receiverConnected);
1064 successfullyConnected = false;
1065 receiverConnected = false;
1066 alice->connectionManager->connectDevice(bob->id.second,
1067 "2",
1068 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1069 const DeviceId&) {
1070 if (socket) {
1071 sendSock2 = socket;
1072 successfullyConnected = true;
1073 }
1074 cv.notify_one();
1075 });
1076 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
1077 successfullyConnected = false;
1078 receiverConnected = false;
1079 alice->connectionManager->connectDevice(bob->id.second,
1080 "3",
1081 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1082 const DeviceId&) {
1083 if (socket) {
1084 sendSock3 = socket;
1085 successfullyConnected = true;
1086 }
1087 cv.notify_one();
1088 });
1089 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
Adrien Béraud4796de12023-09-25 14:46:47 -04001090 constexpr size_t C = 8000;
Amnab8c33bb2023-08-03 14:40:01 -04001091 std::string alphabet, shouldRcv, rcv1, rcv2, rcv3;
Adrien Béraud4796de12023-09-25 14:46:47 -04001092 std::mutex mtx1, mtx2, mtx3;
Amnab8c33bb2023-08-03 14:40:01 -04001093 for (int i = 0; i < 100; ++i)
1094 alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
Adrien Béraud4796de12023-09-25 14:46:47 -04001095 auto totSize = C * alphabet.size();
1096 shouldRcv.reserve(totSize);
1097 rcv1.reserve(totSize);
1098 rcv2.reserve(totSize);
1099 rcv3.reserve(totSize);
Amnab8c33bb2023-08-03 14:40:01 -04001100 rcvSock1->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001101 std::lock_guard<std::mutex> lk {mtx1};
1102 rcv1 += std::string_view((const char*)buf, len);
1103 if (rcv1.size() == totSize)
1104 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001105 return len;
1106 });
1107 rcvSock2->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001108 std::lock_guard<std::mutex> lk {mtx2};
1109 rcv2 += std::string_view((const char*)buf, len);
1110 if (rcv2.size() == totSize)
1111 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001112 return len;
1113 });
1114 rcvSock3->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001115 std::lock_guard<std::mutex> lk {mtx3};
1116 rcv3 += std::string_view((const char*)buf, len);
1117 if (rcv3.size() == totSize)
1118 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001119 return len;
1120 });
1121 for (uint64_t i = 0; i < alphabet.size(); ++i) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001122 auto send = std::string(C, alphabet[i]);
Amnab8c33bb2023-08-03 14:40:01 -04001123 shouldRcv += send;
1124 std::error_code ec;
1125 sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1126 sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1127 sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1128 CPPUNIT_ASSERT(!ec);
1129 }
Adrien Béraud4796de12023-09-25 14:46:47 -04001130 {
1131 std::unique_lock<std::mutex> lk {mtx1};
1132 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shouldRcv == rcv1; }));
1133 }
1134 {
1135 std::unique_lock<std::mutex> lk {mtx2};
1136 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return shouldRcv == rcv2; }));
1137 }
1138 {
1139 std::unique_lock<std::mutex> lk {mtx3};
1140 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return shouldRcv == rcv3; }));
1141 }
Amnab8c33bb2023-08-03 14:40:01 -04001142}
Morteza Namvar82960b32023-07-04 17:08:22 -04001143
Amnab8c33bb2023-08-03 14:40:01 -04001144void
1145ConnectionManagerTest::testDestroyWhileSending()
1146{
1147 // Same as test before, but destroy the accounts while sending.
1148 // This test if a segfault occurs
Amnab8c33bb2023-08-03 14:40:01 -04001149 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1150 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Amnab8c33bb2023-08-03 14:40:01 -04001151 std::unique_lock<std::mutex> lk {mtx};
1152 std::condition_variable cv;
1153 bool successfullyConnected = false;
1154 bool successfullyReceive = false;
1155 bool receiverConnected = false;
1156 std::shared_ptr<ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2, sendSock3;
1157 bob->connectionManager->onChannelRequest(
1158 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
1159 const std::string& name) {
1160 successfullyReceive = name == "1";
1161 return true;
1162 });
1163 bob->connectionManager->onConnectionReady(
1164 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
1165 receiverConnected = socket != nullptr;
1166 if (name == "1")
1167 rcvSock1 = socket;
1168 else if (name == "2")
1169 rcvSock2 = socket;
1170 else if (name == "3")
1171 rcvSock3 = socket;
1172 });
1173 alice->connectionManager->connectDevice(bob->id.second,
1174 "1",
1175 [&](std::shared_ptr<ChannelSocket> socket,
1176 const DeviceId&) {
1177 if (socket) {
1178 sendSock = socket;
1179 successfullyConnected = true;
1180 }
1181 cv.notify_one();
1182 });
1183 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
1184 return successfullyReceive && successfullyConnected && receiverConnected;
1185 }));
1186 successfullyConnected = false;
1187 receiverConnected = false;
1188 alice->connectionManager->connectDevice(bob->id.second,
1189 "2",
1190 [&](std::shared_ptr<ChannelSocket> socket,
1191 const DeviceId&) {
1192 if (socket) {
1193 sendSock2 = socket;
1194 successfullyConnected = true;
1195 }
1196 cv.notify_one();
1197 });
1198 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
1199 successfullyConnected = false;
1200 receiverConnected = false;
1201 alice->connectionManager->connectDevice(bob->id.second,
1202 "3",
1203 [&](std::shared_ptr<ChannelSocket> socket,
1204 const DeviceId&) {
1205 if (socket) {
1206 sendSock3 = socket;
1207 successfullyConnected = true;
1208 }
1209 cv.notify_one();
1210 });
1211 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
Amnab8c33bb2023-08-03 14:40:01 -04001212 std::string alphabet;
1213 for (int i = 0; i < 100; ++i)
1214 alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
1215 rcvSock1->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1216 rcvSock2->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1217 rcvSock3->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1218 for (uint64_t i = 0; i < alphabet.size(); ++i) {
1219 auto send = std::string(8000, alphabet[i]);
1220 std::error_code ec;
1221 sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1222 sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1223 sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1224 CPPUNIT_ASSERT(!ec);
1225 }
Morteza Namvar82960b32023-07-04 17:08:22 -04001226
Amnab8c33bb2023-08-03 14:40:01 -04001227 // No need to wait, immediately destroy, no segfault must occurs
1228}
Morteza Namvar82960b32023-07-04 17:08:22 -04001229
Amnab8c33bb2023-08-03 14:40:01 -04001230void
1231ConnectionManagerTest::testIsConnecting()
1232{
Amnab8c33bb2023-08-03 14:40:01 -04001233 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1234 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001235
Amnab8c33bb2023-08-03 14:40:01 -04001236 std::unique_lock<std::mutex> lk {mtx};
1237 std::condition_variable cv;
1238 bool successfullyConnected = false, successfullyReceive = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001239
Amnab8c33bb2023-08-03 14:40:01 -04001240 bob->connectionManager->onChannelRequest(
1241 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) {
1242 successfullyReceive = true;
1243 cv.notify_one();
1244 std::this_thread::sleep_for(2s);
1245 return true;
1246 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001247
Amnab8c33bb2023-08-03 14:40:01 -04001248 CPPUNIT_ASSERT(!alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
Morteza Namvar82960b32023-07-04 17:08:22 -04001249
Amnab8c33bb2023-08-03 14:40:01 -04001250 alice->connectionManager->connectDevice(bob->id.second,
1251 "sip",
1252 [&](std::shared_ptr<ChannelSocket> socket,
1253 const DeviceId&) {
1254 if (socket) {
1255 successfullyConnected = true;
1256 }
1257 cv.notify_one();
1258 });
1259 // connectDevice is full async, so isConnecting will be true after a few ms.
1260 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyReceive; }));
1261 CPPUNIT_ASSERT(alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
1262 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; }));
1263 std::this_thread::sleep_for(
1264 std::chrono::milliseconds(100)); // Just to wait for the callback to finish
1265 CPPUNIT_ASSERT(!alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
1266}
Morteza Namvar82960b32023-07-04 17:08:22 -04001267
Amnab8c33bb2023-08-03 14:40:01 -04001268void
1269ConnectionManagerTest::testCanSendBeacon()
1270{
Amnab8c33bb2023-08-03 14:40:01 -04001271 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1272 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001273
Amnab8c33bb2023-08-03 14:40:01 -04001274 std::unique_lock<std::mutex> lk {mtx};
1275 std::condition_variable cv;
1276 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001277
Amnab8c33bb2023-08-03 14:40:01 -04001278 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1279 bob->connectionManager->onChannelRequest(
1280 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1281 bob->connectionManager->onConnectionReady(
1282 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1283 if (socket && socket->name() == "sip")
1284 bobSocket = socket->underlyingSocket();
1285 cv.notify_one();
1286 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001287
Amnab8c33bb2023-08-03 14:40:01 -04001288 alice->connectionManager->connectDevice(bob->id.second,
1289 "sip",
1290 [&](std::shared_ptr<ChannelSocket> socket,
1291 const DeviceId&) {
1292 if (socket) {
1293 aliceSocket = socket->underlyingSocket();
1294 successfullyConnected = true;
1295 }
1296 cv.notify_one();
1297 });
1298 // connectDevice is full async, so isConnecting will be true after a few ms.
1299 CPPUNIT_ASSERT(
1300 cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket && successfullyConnected; }));
1301 CPPUNIT_ASSERT(aliceSocket->canSendBeacon());
Morteza Namvar82960b32023-07-04 17:08:22 -04001302
Amnab8c33bb2023-08-03 14:40:01 -04001303 // Because onConnectionReady is true before version is sent, we can wait a bit
1304 // before canSendBeacon is true.
1305 auto start = std::chrono::steady_clock::now();
1306 auto aliceCanSendBeacon = false;
1307 auto bobCanSendBeacon = false;
1308 do {
1309 aliceCanSendBeacon = aliceSocket->canSendBeacon();
1310 bobCanSendBeacon = bobSocket->canSendBeacon();
1311 if (!bobCanSendBeacon || !aliceCanSendBeacon)
1312 std::this_thread::sleep_for(1s);
1313 } while ((not bobCanSendBeacon or not aliceCanSendBeacon)
1314 and std::chrono::steady_clock::now() - start < 5s);
Morteza Namvar82960b32023-07-04 17:08:22 -04001315
Amnab8c33bb2023-08-03 14:40:01 -04001316 CPPUNIT_ASSERT(bobCanSendBeacon && aliceCanSendBeacon);
1317}
Morteza Namvar82960b32023-07-04 17:08:22 -04001318
Amnab8c33bb2023-08-03 14:40:01 -04001319void
1320ConnectionManagerTest::testCannotSendBeacon()
1321{
Amnab8c33bb2023-08-03 14:40:01 -04001322 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1323 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001324
Amnab8c33bb2023-08-03 14:40:01 -04001325 std::unique_lock<std::mutex> lk {mtx};
1326 std::condition_variable cv;
1327 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001328
Amnab8c33bb2023-08-03 14:40:01 -04001329 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1330 bob->connectionManager->onChannelRequest(
1331 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1332 bob->connectionManager->onConnectionReady(
1333 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1334 if (socket && socket->name() == "sip")
1335 bobSocket = socket->underlyingSocket();
1336 cv.notify_one();
1337 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001338
Amnab8c33bb2023-08-03 14:40:01 -04001339 alice->connectionManager->connectDevice(bob->id.second,
1340 "sip",
1341 [&](std::shared_ptr<ChannelSocket> socket,
1342 const DeviceId&) {
1343 if (socket) {
1344 aliceSocket = socket->underlyingSocket();
1345 successfullyConnected = true;
1346 }
1347 cv.notify_one();
1348 });
1349 // connectDevice is full async, so isConnecting will be true after a few ms.
1350 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001351
Amnab8c33bb2023-08-03 14:40:01 -04001352 int version = 1412;
1353 bobSocket->setOnVersionCb([&](auto v) {
1354 version = v;
1355 cv.notify_one();
1356 });
1357 aliceSocket->setVersion(0);
1358 aliceSocket->sendVersion();
1359 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return version == 0; }));
1360 CPPUNIT_ASSERT(!bobSocket->canSendBeacon());
1361}
Morteza Namvar82960b32023-07-04 17:08:22 -04001362
Amnab8c33bb2023-08-03 14:40:01 -04001363void
1364ConnectionManagerTest::testConnectivityChangeTriggerBeacon()
1365{
Amnab8c33bb2023-08-03 14:40:01 -04001366 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1367 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001368
Amnab8c33bb2023-08-03 14:40:01 -04001369 std::unique_lock<std::mutex> lk {mtx};
1370 std::condition_variable cv;
1371 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001372
Amnab8c33bb2023-08-03 14:40:01 -04001373 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1374 bob->connectionManager->onChannelRequest(
1375 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1376 bob->connectionManager->onConnectionReady(
1377 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1378 if (socket && socket->name() == "sip")
1379 bobSocket = socket->underlyingSocket();
1380 cv.notify_one();
1381 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001382
Amnab8c33bb2023-08-03 14:40:01 -04001383 alice->connectionManager->connectDevice(bob->id.second,
1384 "sip",
1385 [&](std::shared_ptr<ChannelSocket> socket,
1386 const DeviceId&) {
1387 if (socket) {
1388 aliceSocket = socket->underlyingSocket();
1389 successfullyConnected = true;
1390 }
1391 cv.notify_one();
1392 });
1393 // connectDevice is full async, so isConnecting will be true after a few ms.
1394 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001395
Amnab8c33bb2023-08-03 14:40:01 -04001396 bool hasRequest = false;
1397 bobSocket->setOnBeaconCb([&](auto p) {
1398 if (p)
1399 hasRequest = true;
1400 cv.notify_one();
1401 });
1402 alice->connectionManager->connectivityChanged();
1403 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return hasRequest; }));
1404}
Morteza Namvar82960b32023-07-04 17:08:22 -04001405
Amnab8c33bb2023-08-03 14:40:01 -04001406void
1407ConnectionManagerTest::testOnNoBeaconTriggersShutdown()
1408{
Amnab8c33bb2023-08-03 14:40:01 -04001409 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1410 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001411
Amnab8c33bb2023-08-03 14:40:01 -04001412 std::unique_lock<std::mutex> lk {mtx};
1413 std::condition_variable cv;
1414 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001415
Amnab8c33bb2023-08-03 14:40:01 -04001416 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1417 bob->connectionManager->onChannelRequest(
1418 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1419 bob->connectionManager->onConnectionReady(
1420 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1421 if (socket && socket->name() == "sip")
1422 bobSocket = socket->underlyingSocket();
1423 cv.notify_one();
1424 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001425
Amnab8c33bb2023-08-03 14:40:01 -04001426 alice->connectionManager->connectDevice(bob->id.second,
1427 "sip",
1428 [&](std::shared_ptr<ChannelSocket> socket,
1429 const DeviceId&) {
1430 if (socket) {
1431 aliceSocket = socket->underlyingSocket();
1432 successfullyConnected = true;
1433 }
1434 cv.notify_one();
1435 });
1436 // connectDevice is full async, so isConnecting will be true after a few ms.
1437 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001438
Amnab8c33bb2023-08-03 14:40:01 -04001439 bool isClosed = false;
1440 aliceSocket->onShutdown([&] {
1441 isClosed = true;
1442 cv.notify_one();
1443 });
1444 bobSocket->answerToBeacon(false);
1445 alice->connectionManager->connectivityChanged();
1446 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return isClosed; }));
1447}
Morteza Namvar82960b32023-07-04 17:08:22 -04001448
Amnab8c33bb2023-08-03 14:40:01 -04001449void
1450ConnectionManagerTest::testShutdownWhileNegotiating()
1451{
Amnab8c33bb2023-08-03 14:40:01 -04001452 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001453
Amnab8c33bb2023-08-03 14:40:01 -04001454 std::unique_lock<std::mutex> lk {mtx};
1455 std::condition_variable cv;
1456 bool successfullyReceive = false;
1457 bool notConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001458
Amnab8c33bb2023-08-03 14:40:01 -04001459 bob->connectionManager->onICERequest([&](const DeviceId&) {
1460 successfullyReceive = true;
1461 cv.notify_one();
1462 return true;
1463 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001464
Amnab8c33bb2023-08-03 14:40:01 -04001465 alice->connectionManager->connectDevice(bob->id.second,
1466 "git://*",
1467 [&](std::shared_ptr<ChannelSocket> socket,
1468 const DeviceId&) {
1469 notConnected = !socket;
1470 cv.notify_one();
1471 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001472
Amnab8c33bb2023-08-03 14:40:01 -04001473 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; }));
1474 // Manager::instance().setAccountActive(alice->id.second, false, true);
Morteza Namvar82960b32023-07-04 17:08:22 -04001475
Amnab8c33bb2023-08-03 14:40:01 -04001476 // Just move destruction on another thread.
1477 // dht::threadpool::io().run([conMgr =std::move(alice->connectionManager)] {});
1478 alice->connectionManager.reset();
Morteza Namvar82960b32023-07-04 17:08:22 -04001479
Amnab8c33bb2023-08-03 14:40:01 -04001480 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return notConnected; }));
1481}
Adrien Béraud75754b22023-10-17 09:16:06 -04001482
Amnab8c33bb2023-08-03 14:40:01 -04001483void
1484ConnectionManagerTest::testGetChannelList()
1485{
Amnab8c33bb2023-08-03 14:40:01 -04001486 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
Amnab8c33bb2023-08-03 14:40:01 -04001487 std::condition_variable cv;
1488 std::unique_lock<std::mutex> lk {mtx};
1489 bool successfullyConnected = false;
1490 int receiverConnected = 0;
1491 bob->connectionManager->onChannelRequest(
1492 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1493 bob->connectionManager->onConnectionReady(
Adrien Béraud75754b22023-10-17 09:16:06 -04001494 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1495 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001496 if (socket)
1497 receiverConnected += 1;
Amnab8c33bb2023-08-03 14:40:01 -04001498 cv.notify_one();
1499 });
1500 std::string channelId;
1501 alice->connectionManager->connectDevice(bob->id.second,
1502 "git://*",
1503 [&](std::shared_ptr<ChannelSocket> socket,
1504 const DeviceId&) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001505 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001506 if (socket) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001507 channelId = fmt::format(FMT_COMPILE("{:x}"), socket->channel());
Amnab8c33bb2023-08-03 14:40:01 -04001508 successfullyConnected = true;
1509 }
Amnab8c33bb2023-08-03 14:40:01 -04001510 cv.notify_one();
1511 });
1512 CPPUNIT_ASSERT(
1513 cv.wait_for(lk, 60s, [&] { return successfullyConnected && receiverConnected == 1; }));
1514 std::vector<std::map<std::string, std::string>> expectedList = {
Adrien Béraud75754b22023-10-17 09:16:06 -04001515 {{"id", channelId}, {"name", "git://*"}}};
Amnab8c33bb2023-08-03 14:40:01 -04001516 auto connectionList = alice->connectionManager->getConnectionList();
1517 CPPUNIT_ASSERT(!connectionList.empty());
1518 const auto& connectionInfo = connectionList[0];
1519 auto it = connectionInfo.find("id");
1520 CPPUNIT_ASSERT(it != connectionInfo.end());
Adrien Béraud75754b22023-10-17 09:16:06 -04001521 auto actualList = alice->connectionManager->getChannelList(it->second);
Amnab8c33bb2023-08-03 14:40:01 -04001522 CPPUNIT_ASSERT(expectedList.size() == actualList.size());
Amnab8c33bb2023-08-03 14:40:01 -04001523 for (const auto& expectedMap : expectedList) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001524 CPPUNIT_ASSERT(std::find(actualList.begin(), actualList.end(), expectedMap)
1525 != actualList.end());
Amnab8c33bb2023-08-03 14:40:01 -04001526 }
1527}
Adrien Béraudefe27372023-05-27 18:56:29 -04001528
1529} // namespace test
Sébastien Blin464bdff2023-07-19 08:02:53 -04001530} // namespace dhtnet
Adrien Béraudefe27372023-05-27 18:56:29 -04001531
Adrien Béraud1ae60aa2023-07-07 09:55:09 -04001532JAMI_TEST_RUNNER(dhtnet::test::ConnectionManagerTest::name())