blob: 9d0b1624207e929f93994394ba39b3518a5837c9 [file] [log] [blame]
Adrien Béraudefe27372023-05-27 18:56:29 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraudefe27372023-05-27 18:56:29 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraudefe27372023-05-27 18:56:29 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraudefe27372023-05-27 18:56:29 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraudefe27372023-05-27 18:56:29 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
Adrien Béraudefe27372023-05-27 18:56:29 -040017
18#include "connectionmanager.h"
19#include "multiplexed_socket.h"
20#include "test_runner.h"
Morteza Namvar82960b32023-07-04 17:08:22 -040021#include "certstore.h"
Adrien Béraudefe27372023-05-27 18:56:29 -040022
Adrien Béraud027af2a2023-08-27 12:08:50 -040023#include <opendht/log.h>
24#include <asio/executor_work_guard.hpp>
25#include <asio/io_context.hpp>
Adrien Béraud75754b22023-10-17 09:16:06 -040026#include <fmt/compile.h>
Adrien Béraud027af2a2023-08-27 12:08:50 -040027
28#include <cppunit/TestAssert.h>
29#include <cppunit/TestFixture.h>
30#include <cppunit/extensions/HelperMacros.h>
31
32#include <condition_variable>
33#include <iostream>
34#include <filesystem>
35
Adrien Béraudefe27372023-05-27 18:56:29 -040036using namespace std::literals::chrono_literals;
37
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraudefe27372023-05-27 18:56:29 -040039namespace test {
40
Amnab8c33bb2023-08-03 14:40:01 -040041struct ConnectionHandler
42{
Morteza Namvar82960b32023-07-04 17:08:22 -040043 dht::crypto::Identity id;
44 std::shared_ptr<Logger> logger;
45 std::shared_ptr<tls::CertificateStore> certStore;
46 std::shared_ptr<dht::DhtRunner> dht;
47 std::shared_ptr<ConnectionManager> connectionManager;
48 std::shared_ptr<asio::io_context> ioContext;
Amnab8c33bb2023-08-03 14:40:01 -040049 std::shared_ptr<std::thread> ioContextRunner;
Morteza Namvar82960b32023-07-04 17:08:22 -040050};
51
Adrien Béraudefe27372023-05-27 18:56:29 -040052class ConnectionManagerTest : public CppUnit::TestFixture
53{
54public:
Adrien Béraud4796de12023-09-25 14:46:47 -040055 ConnectionManagerTest() {
56 pj_log_set_level(0);
57 pj_log_set_log_func([](int level, const char* data, int /*len*/) {});
58 // logger->debug("Using PJSIP version {} for {}", pj_get_version(), PJ_OS_NAME);
59 // logger->debug("Using GnuTLS version {}", gnutls_check_version(nullptr));
60 // logger->debug("Using OpenDHT version {}", dht::version());
61 }
Amnab8c33bb2023-08-03 14:40:01 -040062 ~ConnectionManagerTest() {}
Adrien Béraudefe27372023-05-27 18:56:29 -040063 static std::string name() { return "ConnectionManager"; }
64 void setUp();
65 void tearDown();
66
Adrien Béraud4796de12023-09-25 14:46:47 -040067 std::shared_ptr<dht::DhtRunner> bootstrap_node;
68 dht::crypto::Identity org1Id, org2Id;
69 dht::crypto::Identity aliceId, bobId;
70 dht::crypto::Identity aliceDevice1Id, bobDevice1Id;
71
Morteza Namvar82960b32023-07-04 17:08:22 -040072 std::unique_ptr<ConnectionHandler> alice;
73 std::unique_ptr<ConnectionHandler> bob;
Adrien Béraudefe27372023-05-27 18:56:29 -040074
Morteza Namvar82960b32023-07-04 17:08:22 -040075 std::mutex mtx;
76 std::shared_ptr<asio::io_context> ioContext;
Amnab8c33bb2023-08-03 14:40:01 -040077 std::shared_ptr<std::thread> ioContextRunner;
Adrien Béraud4796de12023-09-25 14:46:47 -040078 std::shared_ptr<Logger> logger = dht::log::getStdLogger();
Amna81221ad2023-09-14 17:33:26 -040079 std::shared_ptr<IceTransportFactory> factory;
Morteza Namvar82960b32023-07-04 17:08:22 -040080
Amnab8c33bb2023-08-03 14:40:01 -040081private:
Adrien Béraud4796de12023-09-25 14:46:47 -040082 std::unique_ptr<ConnectionHandler> setupHandler(const dht::crypto::Identity& id, const std::string& bootstrap = "bootstrap.jami.net");
Morteza Namvar82960b32023-07-04 17:08:22 -040083
Amnab8c33bb2023-08-03 14:40:01 -040084 void testConnectDevice();
85 void testAcceptConnection();
Adrien Béraud8d787732023-10-16 12:58:17 -040086 void testManyChannels();
Amnab8c33bb2023-08-03 14:40:01 -040087 void testMultipleChannels();
88 void testMultipleChannelsOneDeclined();
89 void testMultipleChannelsSameName();
90 void testDeclineConnection();
91 void testSendReceiveData();
92 void testAcceptsICERequest();
93 void testDeclineICERequest();
94 void testChannelRcvShutdown();
95 void testChannelSenderShutdown();
96 void testCloseConnectionWith();
97 void testShutdownCallbacks();
98 void testFloodSocket();
99 void testDestroyWhileSending();
100 void testIsConnecting();
Sébastien Blind0c92c72023-12-07 15:27:51 -0500101 void testIsConnected();
Amnab8c33bb2023-08-03 14:40:01 -0400102 void testCanSendBeacon();
103 void testCannotSendBeacon();
104 void testConnectivityChangeTriggerBeacon();
105 void testOnNoBeaconTriggersShutdown();
106 void testShutdownWhileNegotiating();
107 void testGetChannelList();
Sébastien Blind0c92c72023-12-07 15:27:51 -0500108
Adrien Béraudefe27372023-05-27 18:56:29 -0400109 CPPUNIT_TEST_SUITE(ConnectionManagerTest);
Amnab8c33bb2023-08-03 14:40:01 -0400110 CPPUNIT_TEST(testDeclineICERequest);
111 CPPUNIT_TEST(testConnectDevice);
112 CPPUNIT_TEST(testIsConnecting);
Sébastien Blind0c92c72023-12-07 15:27:51 -0500113 CPPUNIT_TEST(testIsConnected);
Amnab8c33bb2023-08-03 14:40:01 -0400114 CPPUNIT_TEST(testAcceptConnection);
115 CPPUNIT_TEST(testDeclineConnection);
Adrien Béraud8d787732023-10-16 12:58:17 -0400116 CPPUNIT_TEST(testManyChannels);
Amnab8c33bb2023-08-03 14:40:01 -0400117 CPPUNIT_TEST(testMultipleChannels);
118 CPPUNIT_TEST(testMultipleChannelsOneDeclined);
119 CPPUNIT_TEST(testMultipleChannelsSameName);
120 CPPUNIT_TEST(testSendReceiveData);
121 CPPUNIT_TEST(testAcceptsICERequest);
122 CPPUNIT_TEST(testChannelRcvShutdown);
123 CPPUNIT_TEST(testChannelSenderShutdown);
124 CPPUNIT_TEST(testCloseConnectionWith);
125 CPPUNIT_TEST(testShutdownCallbacks);
126 CPPUNIT_TEST(testFloodSocket);
127 CPPUNIT_TEST(testDestroyWhileSending);
128 CPPUNIT_TEST(testCanSendBeacon);
129 CPPUNIT_TEST(testCannotSendBeacon);
130 CPPUNIT_TEST(testConnectivityChangeTriggerBeacon);
131 CPPUNIT_TEST(testOnNoBeaconTriggersShutdown);
132 CPPUNIT_TEST(testShutdownWhileNegotiating);
133 CPPUNIT_TEST(testGetChannelList);
Adrien Béraudefe27372023-05-27 18:56:29 -0400134 CPPUNIT_TEST_SUITE_END();
135};
136
137CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(ConnectionManagerTest, ConnectionManagerTest::name());
138
Morteza Namvar82960b32023-07-04 17:08:22 -0400139std::unique_ptr<ConnectionHandler>
Adrien Béraud4796de12023-09-25 14:46:47 -0400140ConnectionManagerTest::setupHandler(const dht::crypto::Identity& id, const std::string& bootstrap)
Amnab8c33bb2023-08-03 14:40:01 -0400141{
Morteza Namvar82960b32023-07-04 17:08:22 -0400142 auto h = std::make_unique<ConnectionHandler>();
Adrien Béraud4796de12023-09-25 14:46:47 -0400143 h->id = id;
144 h->logger = {};//logger;
145 h->certStore = std::make_shared<tls::CertificateStore>(id.second->getName(), nullptr/*h->logger*/);
Amnab8c33bb2023-08-03 14:40:01 -0400146 h->ioContext = ioContext;
Amnab8c33bb2023-08-03 14:40:01 -0400147 h->ioContextRunner = ioContextRunner;
Morteza Namvar82960b32023-07-04 17:08:22 -0400148
149 dht::DhtRunner::Config dhtConfig;
150 dhtConfig.dht_config.id = h->id;
151 dhtConfig.threaded = true;
152
153 dht::DhtRunner::Context dhtContext;
Amnab8c33bb2023-08-03 14:40:01 -0400154 dhtContext.certificateStore = [c = h->certStore](const dht::InfoHash& pk_id) {
Morteza Namvar82960b32023-07-04 17:08:22 -0400155 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
156 if (auto cert = c->getCertificate(pk_id.toString()))
157 ret.emplace_back(std::move(cert));
158 return ret;
159 };
Amnab8c33bb2023-08-03 14:40:01 -0400160 // dhtContext.logger = h->logger;
Morteza Namvar82960b32023-07-04 17:08:22 -0400161
162 h->dht = std::make_shared<dht::DhtRunner>();
163 h->dht->run(dhtConfig, std::move(dhtContext));
Adrien Béraud4796de12023-09-25 14:46:47 -0400164 h->dht->bootstrap(bootstrap);
Morteza Namvar82960b32023-07-04 17:08:22 -0400165
166 auto config = std::make_shared<ConnectionManager::Config>();
167 config->dht = h->dht;
168 config->id = h->id;
169 config->ioContext = h->ioContext;
Amna81221ad2023-09-14 17:33:26 -0400170 config->factory = factory;
Adrien Béraud4796de12023-09-25 14:46:47 -0400171 // config->logger = logger;
Amna81221ad2023-09-14 17:33:26 -0400172 config->certStore = h->certStore;
Adrien Béraud4796de12023-09-25 14:46:47 -0400173 config->cachePath = std::filesystem::current_path() / id.second->getName() / "temp";
Morteza Namvar82960b32023-07-04 17:08:22 -0400174
175 h->connectionManager = std::make_shared<ConnectionManager>(config);
Amnab8c33bb2023-08-03 14:40:01 -0400176 h->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraud4796de12023-09-25 14:46:47 -0400177 h->connectionManager->onDhtConnected(h->id.first->getPublicKey());
178
Morteza Namvar82960b32023-07-04 17:08:22 -0400179 return h;
180}
181
Adrien Béraudefe27372023-05-27 18:56:29 -0400182void
183ConnectionManagerTest::setUp()
184{
Adrien Béraud4796de12023-09-25 14:46:47 -0400185 if (not org1Id.first) {
186 org1Id = dht::crypto::generateIdentity("org1");
187 org2Id = dht::crypto::generateIdentity("org2");
188 aliceId = dht::crypto::generateIdentity("alice", org1Id, 2048, true);
189 bobId = dht::crypto::generateIdentity("bob", org2Id, 2048, true);
190 aliceDevice1Id = dht::crypto::generateIdentity("aliceDevice1", aliceId);
191 bobDevice1Id = dht::crypto::generateIdentity("bobDevice1", bobId);
192 }
Adrien Béraudc631a832023-07-26 22:19:00 -0400193
Morteza Namvar82960b32023-07-04 17:08:22 -0400194 ioContext = std::make_shared<asio::io_context>();
Amnab8c33bb2023-08-03 14:40:01 -0400195 ioContextRunner = std::make_shared<std::thread>([context = ioContext]() {
Morteza Namvar82960b32023-07-04 17:08:22 -0400196 try {
197 auto work = asio::make_work_guard(*context);
198 context->run();
199 } catch (const std::exception& ex) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400200 fmt::print("Exception in ioContextRunner: {}\n", ex.what());
Morteza Namvar82960b32023-07-04 17:08:22 -0400201 }
202 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400203 bootstrap_node = std::make_shared<dht::DhtRunner>();
204 bootstrap_node->run(36432);
Sébastien Blind0c92c72023-12-07 15:27:51 -0500205
Adrien Béraud4796de12023-09-25 14:46:47 -0400206 factory = std::make_unique<IceTransportFactory>(/*logger*/);
207 alice = setupHandler(aliceDevice1Id, "127.0.0.1:36432");
208 bob = setupHandler(bobDevice1Id, "127.0.0.1:36432");
Adrien Béraudefe27372023-05-27 18:56:29 -0400209}
210
211void
212ConnectionManagerTest::tearDown()
213{
Amnab8c33bb2023-08-03 14:40:01 -0400214 // wait_for_removal_of({aliceId, bobId});
215 // Stop the io_context and join the ioContextRunner thread
Morteza Namvar82960b32023-07-04 17:08:22 -0400216 ioContext->stop();
Amnab8c33bb2023-08-03 14:40:01 -0400217
218 if (ioContextRunner && ioContextRunner->joinable()) {
219 ioContextRunner->join();
220 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400221
222 bootstrap_node.reset();
223 alice.reset();
224 bob.reset();
225 factory.reset();
Adrien Béraudefe27372023-05-27 18:56:29 -0400226}
Adrien Béraudefe27372023-05-27 18:56:29 -0400227void
Morteza Namvar82960b32023-07-04 17:08:22 -0400228ConnectionManagerTest::testConnectDevice()
Adrien Béraudefe27372023-05-27 18:56:29 -0400229{
Morteza Namvar82960b32023-07-04 17:08:22 -0400230 std::condition_variable bobConVar;
231 bool isBobRecvChanlReq = false;
Amnab8c33bb2023-08-03 14:40:01 -0400232 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400233 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400234 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400235 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400236 isBobRecvChanlReq = name == "dumyName";
237 bobConVar.notify_one();
238 return true;
239 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400240
Morteza Namvar82960b32023-07-04 17:08:22 -0400241 std::condition_variable alicConVar;
242 bool isAlicConnected = false;
Adrien Béraud4796de12023-09-25 14:46:47 -0400243 alice->connectionManager->connectDevice(bob->id.second, "dumyName", [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
244 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400245 if (socket) {
246 isAlicConnected = true;
247 }
Morteza Namvar82960b32023-07-04 17:08:22 -0400248 alicConVar.notify_one();
Adrien Béraud4796de12023-09-25 14:46:47 -0400249 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400250
Adrien Béraud4796de12023-09-25 14:46:47 -0400251 std::unique_lock<std::mutex> lock {mtx};
252 CPPUNIT_ASSERT(bobConVar.wait_for(lock, 30s, [&] { return isBobRecvChanlReq; }));
253 CPPUNIT_ASSERT(alicConVar.wait_for(lock, 30s, [&] { return isAlicConnected; }));
Adrien Béraudefe27372023-05-27 18:56:29 -0400254}
255
Amnab8c33bb2023-08-03 14:40:01 -0400256void
257ConnectionManagerTest::testAcceptConnection()
258{
Amnab8c33bb2023-08-03 14:40:01 -0400259 std::unique_lock<std::mutex> lk {mtx};
260 std::condition_variable cv;
261 bool successfullyConnected = false;
262 bool successfullyReceive = false;
263 bool receiverConnected = false;
Adrien Béraudefe27372023-05-27 18:56:29 -0400264
Amnab8c33bb2023-08-03 14:40:01 -0400265 bob->connectionManager->onChannelRequest(
266 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
267 const std::string& name) {
268 successfullyReceive = name == "git://*";
269 return true;
270 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400271
Amnab8c33bb2023-08-03 14:40:01 -0400272 bob->connectionManager->onConnectionReady(
273 [&receiverConnected](const DeviceId&,
274 const std::string& name,
275 std::shared_ptr<ChannelSocket> socket) {
276 receiverConnected = socket && (name == "git://*");
277 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400278
Amnab8c33bb2023-08-03 14:40:01 -0400279 alice->connectionManager->connectDevice(bob->id.second,
280 "git://*",
281 [&](std::shared_ptr<ChannelSocket> socket,
282 const DeviceId&) {
283 if (socket) {
284 successfullyConnected = true;
285 }
286 cv.notify_one();
287 });
288 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
289 return successfullyReceive && successfullyConnected && receiverConnected;
290 }));
291}
Adrien Béraudefe27372023-05-27 18:56:29 -0400292
Amnab8c33bb2023-08-03 14:40:01 -0400293void
294ConnectionManagerTest::testDeclineConnection()
295{
Amnab8c33bb2023-08-03 14:40:01 -0400296 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
297 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400298
Amnab8c33bb2023-08-03 14:40:01 -0400299 std::unique_lock<std::mutex> lk {mtx};
300 std::condition_variable cv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400301 bool connectCompleted = false;
Amnab8c33bb2023-08-03 14:40:01 -0400302 bool successfullyConnected = false;
303 bool successfullyReceive = false;
304 bool receiverConnected = false;
Adrien Béraudefe27372023-05-27 18:56:29 -0400305
Amnab8c33bb2023-08-03 14:40:01 -0400306 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400307 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400308 const std::string&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400309 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400310 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400311 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400312 return false;
313 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400314
Amnab8c33bb2023-08-03 14:40:01 -0400315 bob->connectionManager->onConnectionReady(
316 [&receiverConnected](const DeviceId&,
317 const std::string&,
318 std::shared_ptr<ChannelSocket> socket) {
319 if (socket)
320 receiverConnected = true;
321 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400322
Amnab8c33bb2023-08-03 14:40:01 -0400323 alice->connectionManager->connectDevice(bob->id.second,
324 "git://*",
325 [&](std::shared_ptr<ChannelSocket> socket,
326 const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400327 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400328 if (socket) {
329 successfullyConnected = true;
330 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400331 connectCompleted = true;
Amnab8c33bb2023-08-03 14:40:01 -0400332 cv.notify_one();
333 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400334 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; }));
335 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return connectCompleted; }));
Amnab8c33bb2023-08-03 14:40:01 -0400336 CPPUNIT_ASSERT(!successfullyConnected);
337 CPPUNIT_ASSERT(!receiverConnected);
338}
Adrien Béraudefe27372023-05-27 18:56:29 -0400339
Adrien Béraud8d787732023-10-16 12:58:17 -0400340
341void
342ConnectionManagerTest::testManyChannels()
343{
344 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
345 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
346
347 std::condition_variable cv;
348 size_t successfullyConnected = 0;
349 size_t accepted = 0;
350 size_t receiverConnected = 0;
351 size_t successfullyReceived = 0;
352 size_t shutdownCount = 0;
353
354 auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
355 if (name.empty()) return false;
356 std::lock_guard<std::mutex> lk {mtx};
357 accepted++;
358 cv.notify_one();
359 return true;
360 };
361 bob->connectionManager->onChannelRequest(acceptAll);
362 alice->connectionManager->onChannelRequest(acceptAll);
363
364 auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
365 if (not socket or name.empty()) return;
366 if (socket->isInitiator())
367 return;
368 socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) {
369 rxbuf->insert(rxbuf->end(), data, data + size);
370 if (rxbuf->size() == 32) {
371 if (auto socket = w.lock()) {
372 std::error_code ec;
373 socket->write(rxbuf->data(), rxbuf->size(), ec);
374 CPPUNIT_ASSERT(!ec);
375 socket->shutdown();
376 }
377 }
378 return size;
379 });
380 std::lock_guard<std::mutex> lk {mtx};
381 receiverConnected++;
382 cv.notify_one();
383 };
384 bob->connectionManager->onConnectionReady(onReady);
385 alice->connectionManager->onConnectionReady(onReady);
386
387 // max supported number of channels per side (64k - 2 reserved channels)
388 static constexpr size_t N = 1024 * 32 - 1;
389
390 auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
391 CPPUNIT_ASSERT(socket);
392 if (socket) {
393 std::lock_guard<std::mutex> lk {mtx};
394 successfullyConnected++;
395 cv.notify_one();
396 }
397 auto data_sent = dht::PkId::get(socket->name());
398 socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) {
399 rxbuf->insert(rxbuf->end(), data, data + size);
400 if (rxbuf->size() == 32) {
401 CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size()));
402 std::lock_guard<std::mutex> lk {mtx};
403 successfullyReceived++;
404 cv.notify_one();
405 }
406 return size;
407 });
408 socket->onShutdown([&]() {
409 std::lock_guard<std::mutex> lk {mtx};
410 shutdownCount++;
411 cv.notify_one();
412 });
413 std::error_code ec;
414 socket->write(data_sent.data(), data_sent.size(), ec);
415 CPPUNIT_ASSERT(!ec);
416 };
417
418 for (size_t i = 0; i < N; ++i) {
419 alice->connectionManager->connectDevice(bob->id.second,
420 fmt::format("git://{}", i+1),
421 onConnect);
422
423 bob->connectionManager->connectDevice(alice->id.second,
424 fmt::format("sip://{}", i+1),
425 onConnect);
426
427 if (i % 128 == 0)
428 std::this_thread::sleep_for(5ms);
429 }
430
431 std::unique_lock<std::mutex> lk {mtx};
432 cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; });
433 CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected);
434 cv.wait_for(lk, 30s, [&] { return accepted == N * 2; });
435 CPPUNIT_ASSERT_EQUAL(N * 2, accepted);
436 cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; });
437 CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected);
438 cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; });
439 CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived);
440 cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; });
441 CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount);
442 lk.unlock();
443
444 // Wait a bit to let at least some channels shutdown
445 std::this_thread::sleep_for(10ms);
446
447 // Second time to make sure we can re-use the channels after shutdown
448 for (size_t i = 0; i < N; ++i) {
449 alice->connectionManager->connectDevice(bob->id.second,
450 fmt::format("git://{}", N+i+1),
451 onConnect);
452
453 bob->connectionManager->connectDevice(alice->id.second,
454 fmt::format("sip://{}", N+i+1),
455 onConnect);
456
457 if (i % 128 == 0)
458 std::this_thread::sleep_for(5ms);
459 }
460
461 lk.lock();
462 cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; });
463 CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected);
464 cv.wait_for(lk, 30s, [&] { return accepted == N * 4; });
465 CPPUNIT_ASSERT_EQUAL(N * 4, accepted);
466 cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; });
467 CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected);
468 cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; });
469 CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived);
470 cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; });
471 CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount);
472}
473
Amnab8c33bb2023-08-03 14:40:01 -0400474void
475ConnectionManagerTest::testMultipleChannels()
476{
Amnab8c33bb2023-08-03 14:40:01 -0400477 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
478 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400479
Amnab8c33bb2023-08-03 14:40:01 -0400480 std::condition_variable cv;
481 bool successfullyConnected = false;
482 bool successfullyConnected2 = false;
483 int receiverConnected = 0;
Adrien Béraudefe27372023-05-27 18:56:29 -0400484
Amnab8c33bb2023-08-03 14:40:01 -0400485 bob->connectionManager->onChannelRequest(
486 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400487
Amnab8c33bb2023-08-03 14:40:01 -0400488 bob->connectionManager->onConnectionReady(
Adrien Béraud4796de12023-09-25 14:46:47 -0400489 [&](const DeviceId&, const std::string& name,
Amnab8c33bb2023-08-03 14:40:01 -0400490 std::shared_ptr<ChannelSocket> socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400491 if (not name.empty()) {
492 std::lock_guard<std::mutex> lk {mtx};
493 if (socket)
494 receiverConnected += 1;
495 cv.notify_one();
496 }
Amnab8c33bb2023-08-03 14:40:01 -0400497 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400498
Amnab8c33bb2023-08-03 14:40:01 -0400499 alice->connectionManager->connectDevice(bob->id.second,
500 "git://*",
501 [&](std::shared_ptr<ChannelSocket> socket,
502 const DeviceId&) {
503 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400504 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400505 successfullyConnected = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400506 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400507 }
Amnab8c33bb2023-08-03 14:40:01 -0400508 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400509
Amnab8c33bb2023-08-03 14:40:01 -0400510 alice->connectionManager->connectDevice(bob->id.second,
511 "sip://*",
512 [&](std::shared_ptr<ChannelSocket> socket,
513 const DeviceId&) {
514 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400515 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400516 successfullyConnected2 = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400517 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400518 }
Amnab8c33bb2023-08-03 14:40:01 -0400519 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400520
Adrien Béraud4796de12023-09-25 14:46:47 -0400521 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400522 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
523 return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
524 }));
525 CPPUNIT_ASSERT(alice->connectionManager->activeSockets() == 1);
526}
Adrien Béraudefe27372023-05-27 18:56:29 -0400527
Amnab8c33bb2023-08-03 14:40:01 -0400528void
529ConnectionManagerTest::testMultipleChannelsOneDeclined()
530{
Amnab8c33bb2023-08-03 14:40:01 -0400531 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
532 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Adrien Béraudefe27372023-05-27 18:56:29 -0400533
Amnab8c33bb2023-08-03 14:40:01 -0400534 std::unique_lock<std::mutex> lk {mtx};
535 std::condition_variable cv;
536 bool successfullyNotConnected = false;
537 bool successfullyConnected2 = false;
538 int receiverConnected = 0;
Adrien Béraudefe27372023-05-27 18:56:29 -0400539
Amnab8c33bb2023-08-03 14:40:01 -0400540 bob->connectionManager->onChannelRequest(
541 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
542 if (name == "git://*")
543 return false;
544 return true;
545 });
Adrien Béraudefe27372023-05-27 18:56:29 -0400546
Amnab8c33bb2023-08-03 14:40:01 -0400547 bob->connectionManager->onConnectionReady(
548 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
549 if (socket)
550 receiverConnected += 1;
551 cv.notify_one();
552 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400553
Amnab8c33bb2023-08-03 14:40:01 -0400554 alice->connectionManager->connectDevice(bob->id.second,
555 "git://*",
556 [&](std::shared_ptr<ChannelSocket> socket,
557 const DeviceId&) {
558 if (!socket)
559 successfullyNotConnected = true;
560 cv.notify_one();
561 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400562
Amnab8c33bb2023-08-03 14:40:01 -0400563 alice->connectionManager->connectDevice(bob->id.second,
564 "sip://*",
565 [&](std::shared_ptr<ChannelSocket> socket,
566 const DeviceId&) {
567 if (socket)
568 successfullyConnected2 = true;
569 cv.notify_one();
570 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400571
Amnab8c33bb2023-08-03 14:40:01 -0400572 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
573 return successfullyNotConnected && successfullyConnected2 && receiverConnected == 1;
574 }));
575 CPPUNIT_ASSERT(alice->connectionManager->activeSockets() == 1);
576}
Morteza Namvar82960b32023-07-04 17:08:22 -0400577
Amnab8c33bb2023-08-03 14:40:01 -0400578void
579ConnectionManagerTest::testMultipleChannelsSameName()
580{
Amnab8c33bb2023-08-03 14:40:01 -0400581 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
582 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400583
Amnab8c33bb2023-08-03 14:40:01 -0400584 std::unique_lock<std::mutex> lk {mtx};
585 std::condition_variable cv;
586 bool successfullyConnected = false;
587 bool successfullyConnected2 = false;
588 int receiverConnected = 0;
Morteza Namvar82960b32023-07-04 17:08:22 -0400589
Amnab8c33bb2023-08-03 14:40:01 -0400590 bob->connectionManager->onChannelRequest(
591 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400592
Amnab8c33bb2023-08-03 14:40:01 -0400593 bob->connectionManager->onConnectionReady(
594 [&receiverConnected](const DeviceId&,
595 const std::string&,
596 std::shared_ptr<ChannelSocket> socket) {
597 if (socket)
598 receiverConnected += 1;
599 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400600
Amnab8c33bb2023-08-03 14:40:01 -0400601 alice->connectionManager->connectDevice(bob->id.second,
602 "git://*",
603 [&](std::shared_ptr<ChannelSocket> socket,
604 const DeviceId&) {
605 if (socket) {
606 successfullyConnected = true;
607 }
608 cv.notify_one();
609 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400610
Amnab8c33bb2023-08-03 14:40:01 -0400611 // We can open two sockets with the same name, it will be two different channel
612 alice->connectionManager->connectDevice(bob->id.second,
613 "git://*",
614 [&](std::shared_ptr<ChannelSocket> socket,
615 const DeviceId&) {
616 if (socket) {
617 successfullyConnected2 = true;
618 }
619 cv.notify_one();
620 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400621
Amnab8c33bb2023-08-03 14:40:01 -0400622 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
623 return successfullyConnected && successfullyConnected2 && receiverConnected == 2;
624 }));
625}
Morteza Namvar82960b32023-07-04 17:08:22 -0400626
Amnab8c33bb2023-08-03 14:40:01 -0400627void
628ConnectionManagerTest::testSendReceiveData()
629{
Amnab8c33bb2023-08-03 14:40:01 -0400630 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
631 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400632
Amnab8c33bb2023-08-03 14:40:01 -0400633 std::unique_lock<std::mutex> lk {mtx};
634 std::condition_variable cv;
635 std::atomic_int events(0);
636 bool successfullyConnected = false, successfullyConnected2 = false, successfullyReceive = false,
637 receiverConnected = false;
638 const uint8_t buf_other[] = {0x64, 0x65, 0x66, 0x67};
639 const uint8_t buf_test[] = {0x68, 0x69, 0x70, 0x71};
640 bool dataOk = false, dataOk2 = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400641
Amnab8c33bb2023-08-03 14:40:01 -0400642 bob->connectionManager->onChannelRequest(
643 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
644 const std::string&) {
645 successfullyReceive = true;
646 return true;
647 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400648
Amnab8c33bb2023-08-03 14:40:01 -0400649 bob->connectionManager->onConnectionReady(
650 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
651 if (socket && (name == "test" || name == "other")) {
652 receiverConnected = true;
653 std::error_code ec;
654 auto res = socket->waitForData(std::chrono::milliseconds(5000), ec);
655 if (res == 4) {
656 uint8_t buf[4];
657 socket->read(&buf[0], 4, ec);
658 if (name == "test")
659 dataOk = std::equal(std::begin(buf), std::end(buf), std::begin(buf_test));
660 else
661 dataOk2 = std::equal(std::begin(buf), std::end(buf), std::begin(buf_other));
662 events++;
663 cv.notify_one();
664 }
665 }
666 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400667
Amnab8c33bb2023-08-03 14:40:01 -0400668 alice->connectionManager->connectDevice(bob->id.second,
669 "test",
670 [&](std::shared_ptr<ChannelSocket> socket,
671 const DeviceId&) {
672 if (socket) {
673 successfullyConnected = true;
674 std::error_code ec;
675 socket->write(&buf_test[0], 4, ec);
676 }
677 events++;
678 cv.notify_one();
679 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400680
Amnab8c33bb2023-08-03 14:40:01 -0400681 alice->connectionManager->connectDevice(bob->id.second,
682 "other",
683 [&](std::shared_ptr<ChannelSocket> socket,
684 const DeviceId&) {
685 if (socket) {
686 successfullyConnected2 = true;
687 std::error_code ec;
688 socket->write(&buf_other[0], 4, ec);
689 }
690 events++;
691 cv.notify_one();
692 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400693
Amnab8c33bb2023-08-03 14:40:01 -0400694 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] {
695 return events == 4 && successfullyReceive && successfullyConnected && successfullyConnected2
696 && dataOk && dataOk2;
697 }));
698}
Morteza Namvar82960b32023-07-04 17:08:22 -0400699
Amnab8c33bb2023-08-03 14:40:01 -0400700void
701ConnectionManagerTest::testAcceptsICERequest()
702{
Amnab8c33bb2023-08-03 14:40:01 -0400703 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400704
Amnab8c33bb2023-08-03 14:40:01 -0400705 std::unique_lock<std::mutex> lk {mtx};
706 std::condition_variable cv;
707 bool successfullyConnected = false;
708 bool successfullyReceive = false;
709 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400710
Amnab8c33bb2023-08-03 14:40:01 -0400711 bob->connectionManager->onChannelRequest(
712 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
713 bob->connectionManager->onICERequest([&](const DeviceId&) {
714 successfullyReceive = true;
715 return true;
716 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400717
Amnab8c33bb2023-08-03 14:40:01 -0400718 bob->connectionManager->onConnectionReady(
719 [&receiverConnected](const DeviceId&,
720 const std::string& name,
721 std::shared_ptr<ChannelSocket> socket) {
722 receiverConnected = socket && (name == "git://*");
723 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400724
Amnab8c33bb2023-08-03 14:40:01 -0400725 alice->connectionManager->connectDevice(bob->id.second,
726 "git://*",
727 [&](std::shared_ptr<ChannelSocket> socket,
728 const DeviceId&) {
729 if (socket) {
730 successfullyConnected = true;
731 }
732 cv.notify_one();
733 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400734
Amnab8c33bb2023-08-03 14:40:01 -0400735 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
736 return successfullyReceive && successfullyConnected && receiverConnected;
737 }));
738}
Morteza Namvar82960b32023-07-04 17:08:22 -0400739
Amnab8c33bb2023-08-03 14:40:01 -0400740void
741ConnectionManagerTest::testDeclineICERequest()
742{
Amnab8c33bb2023-08-03 14:40:01 -0400743 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400744
Amnab8c33bb2023-08-03 14:40:01 -0400745 std::condition_variable cv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400746 bool connectCompleted = false;
Amnab8c33bb2023-08-03 14:40:01 -0400747 bool successfullyConnected = false;
748 bool successfullyReceive = false;
749 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400750
Amnab8c33bb2023-08-03 14:40:01 -0400751 bob->connectionManager->onChannelRequest(
752 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
753 bob->connectionManager->onICERequest([&](const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400754 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400755 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400756 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400757 return false;
758 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400759
Amnab8c33bb2023-08-03 14:40:01 -0400760 bob->connectionManager->onConnectionReady(
761 [&receiverConnected](const DeviceId&,
762 const std::string& name,
763 std::shared_ptr<ChannelSocket> socket) {
764 receiverConnected = socket && (name == "git://*");
765 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400766
Amnab8c33bb2023-08-03 14:40:01 -0400767 alice->connectionManager->connectDevice(bob->id.second,
768 "git://*",
769 [&](std::shared_ptr<ChannelSocket> socket,
770 const DeviceId&) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400771 std::lock_guard<std::mutex> lock {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400772 if (socket) {
773 successfullyConnected = true;
774 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400775 connectCompleted = true;
Amnab8c33bb2023-08-03 14:40:01 -0400776 cv.notify_one();
777 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400778
Adrien Béraud4796de12023-09-25 14:46:47 -0400779 std::unique_lock<std::mutex> lk {mtx};
780 CPPUNIT_ASSERT(cv.wait_for(lk, 35s, [&] { return successfullyReceive; }));
781 CPPUNIT_ASSERT(cv.wait_for(lk, 35s, [&] { return connectCompleted; }));
Amnab8c33bb2023-08-03 14:40:01 -0400782 CPPUNIT_ASSERT(!receiverConnected);
783 CPPUNIT_ASSERT(!successfullyConnected);
784}
Morteza Namvar82960b32023-07-04 17:08:22 -0400785
Amnab8c33bb2023-08-03 14:40:01 -0400786void
787ConnectionManagerTest::testChannelRcvShutdown()
788{
Amnab8c33bb2023-08-03 14:40:01 -0400789 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
790 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400791
Amnab8c33bb2023-08-03 14:40:01 -0400792 std::unique_lock<std::mutex> lk {mtx};
793 std::condition_variable cv;
794 bool successfullyConnected = false;
795 bool shutdownReceived = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400796
Amnab8c33bb2023-08-03 14:40:01 -0400797 std::shared_ptr<ChannelSocket> bobSock;
Morteza Namvar82960b32023-07-04 17:08:22 -0400798
Amnab8c33bb2023-08-03 14:40:01 -0400799 bob->connectionManager->onChannelRequest(
800 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400801
Amnab8c33bb2023-08-03 14:40:01 -0400802 bob->connectionManager->onConnectionReady(
803 [&](const DeviceId& did, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
804 if (socket && name == "git://*" && did != bob->id.second->getLongId()) {
805 bobSock = socket;
806 cv.notify_one();
807 }
808 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400809
Amnab8c33bb2023-08-03 14:40:01 -0400810 alice->connectionManager->connectDevice(bob->id.second,
811 "git://*",
812 [&](std::shared_ptr<ChannelSocket> socket,
813 const DeviceId&) {
814 if (socket) {
815 socket->onShutdown([&] {
816 shutdownReceived = true;
817 cv.notify_one();
818 });
819 successfullyConnected = true;
820 cv.notify_one();
821 }
822 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400823
Amnab8c33bb2023-08-03 14:40:01 -0400824 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return bobSock && successfullyConnected; }));
Morteza Namvar82960b32023-07-04 17:08:22 -0400825
Amnab8c33bb2023-08-03 14:40:01 -0400826 bobSock->shutdown();
Morteza Namvar82960b32023-07-04 17:08:22 -0400827
Amnab8c33bb2023-08-03 14:40:01 -0400828 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shutdownReceived; }));
829}
Morteza Namvar82960b32023-07-04 17:08:22 -0400830
Amnab8c33bb2023-08-03 14:40:01 -0400831void
832ConnectionManagerTest::testChannelSenderShutdown()
833{
Amnab8c33bb2023-08-03 14:40:01 -0400834 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
835 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400836
Amnab8c33bb2023-08-03 14:40:01 -0400837 std::condition_variable rcv, scv;
838 bool successfullyConnected = false;
839 bool successfullyReceive = false;
840 bool receiverConnected = false;
841 bool shutdownReceived = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400842
Amnab8c33bb2023-08-03 14:40:01 -0400843 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400844 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400845 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400846 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400847 successfullyReceive = name == "git://*";
Adrien Béraud4796de12023-09-25 14:46:47 -0400848 rcv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400849 return true;
850 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400851
Amnab8c33bb2023-08-03 14:40:01 -0400852 bob->connectionManager->onConnectionReady(
853 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
854 if (socket) {
855 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400856 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400857 shutdownReceived = true;
858 scv.notify_one();
859 });
860 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400861 if (not name.empty()) {
862 std::lock_guard<std::mutex> lk {mtx};
863 receiverConnected = socket && (name == "git://*");
864 rcv.notify_one();
865 }
Amnab8c33bb2023-08-03 14:40:01 -0400866 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400867
Amnab8c33bb2023-08-03 14:40:01 -0400868 alice->connectionManager->connectDevice(bob->id.second,
869 "git://*",
870 [&](std::shared_ptr<ChannelSocket> socket,
871 const DeviceId&) {
872 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400873 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400874 successfullyConnected = true;
875 rcv.notify_one();
876 socket->shutdown();
877 }
878 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400879
Adrien Béraud4796de12023-09-25 14:46:47 -0400880 std::unique_lock<std::mutex> lk {mtx};
881 rcv.wait_for(lk, 30s, [&] { return successfullyConnected && successfullyReceive && receiverConnected; });
882 scv.wait_for(lk, 30s, [&] { return shutdownReceived; });
Amnab8c33bb2023-08-03 14:40:01 -0400883}
Morteza Namvar82960b32023-07-04 17:08:22 -0400884
Amnab8c33bb2023-08-03 14:40:01 -0400885void
886ConnectionManagerTest::testCloseConnectionWith()
887{
Amnab8c33bb2023-08-03 14:40:01 -0400888 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
889 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400890
Amnab8c33bb2023-08-03 14:40:01 -0400891 auto bobUri = bob->id.second->issuer->getId().toString();
Amnab8c33bb2023-08-03 14:40:01 -0400892 std::condition_variable rcv, scv;
Adrien Béraud4796de12023-09-25 14:46:47 -0400893 unsigned events(0);
Amnab8c33bb2023-08-03 14:40:01 -0400894 bool successfullyConnected = false;
895 bool successfullyReceive = false;
896 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400897
Amnab8c33bb2023-08-03 14:40:01 -0400898 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400899 [&](const std::shared_ptr<dht::crypto::Certificate>&,
Amnab8c33bb2023-08-03 14:40:01 -0400900 const std::string& name) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400901 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400902 successfullyReceive = name == "git://*";
903 return true;
904 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400905
Amnab8c33bb2023-08-03 14:40:01 -0400906 bob->connectionManager->onConnectionReady([&](const DeviceId&,
907 const std::string& name,
908 std::shared_ptr<dhtnet::ChannelSocket> socket) {
909 if (socket) {
910 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400911 std::lock_guard<std::mutex> lk {mtx};
912 events++;
Amnab8c33bb2023-08-03 14:40:01 -0400913 scv.notify_one();
914 });
915 }
Adrien Béraud4796de12023-09-25 14:46:47 -0400916 if (not name.empty()) {
917 std::lock_guard<std::mutex> lk {mtx};
918 receiverConnected = socket && (name == "git://*");
919 rcv.notify_one();
920 }
Amnab8c33bb2023-08-03 14:40:01 -0400921 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400922
Adrien Béraud4796de12023-09-25 14:46:47 -0400923 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -0400924 "git://*",
925 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -0400926 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -0400927 if (socket) {
928 socket->onShutdown([&] {
Adrien Béraud4796de12023-09-25 14:46:47 -0400929 std::lock_guard<std::mutex> lk {mtx};
930 events++;
Amnab8c33bb2023-08-03 14:40:01 -0400931 scv.notify_one();
932 });
Adrien Béraud4796de12023-09-25 14:46:47 -0400933 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400934 successfullyConnected = true;
935 rcv.notify_one();
936 }
937 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400938
Adrien Béraud4796de12023-09-25 14:46:47 -0400939 {
940 std::unique_lock<std::mutex> lk {mtx};
941 rcv.wait_for(lk, 30s, [&] {
942 return successfullyReceive && successfullyConnected && receiverConnected;
943 });
944 }
945 std::this_thread::sleep_for(1s);
Amnab8c33bb2023-08-03 14:40:01 -0400946 // This should trigger onShutdown
947 alice->connectionManager->closeConnectionsWith(bobUri);
Adrien Béraud4796de12023-09-25 14:46:47 -0400948 std::unique_lock<std::mutex> lk {mtx};
949 CPPUNIT_ASSERT(scv.wait_for(lk, 10s, [&] { return events == 2; }));
Amnab8c33bb2023-08-03 14:40:01 -0400950}
Morteza Namvar82960b32023-07-04 17:08:22 -0400951
Amnab8c33bb2023-08-03 14:40:01 -0400952// explain algorithm
953void
954ConnectionManagerTest::testShutdownCallbacks()
955{
Amnab8c33bb2023-08-03 14:40:01 -0400956 auto aliceUri = alice->id.second->issuer->getId().toString();
Morteza Namvar82960b32023-07-04 17:08:22 -0400957
Amnab8c33bb2023-08-03 14:40:01 -0400958 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
959 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -0400960
Amnab8c33bb2023-08-03 14:40:01 -0400961 std::condition_variable rcv, chan2cv;
962 bool successfullyConnected = false;
963 bool successfullyReceive = false;
964 bool receiverConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -0400965
Amnab8c33bb2023-08-03 14:40:01 -0400966 bob->connectionManager->onChannelRequest(
Adrien Béraud4796de12023-09-25 14:46:47 -0400967 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
Amnab8c33bb2023-08-03 14:40:01 -0400968 if (name == "1") {
Adrien Béraud4796de12023-09-25 14:46:47 -0400969 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400970 successfullyReceive = true;
Adrien Béraud4796de12023-09-25 14:46:47 -0400971 rcv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -0400972 } else {
973 chan2cv.notify_one();
974 // Do not return directly. Let the connection be closed
975 std::this_thread::sleep_for(10s);
976 }
977 return true;
978 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400979
Amnab8c33bb2023-08-03 14:40:01 -0400980 bob->connectionManager->onConnectionReady([&](const DeviceId&,
981 const std::string& name,
982 std::shared_ptr<dhtnet::ChannelSocket> socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400983 if (name == "1") {
984 std::unique_lock<std::mutex> lk {mtx};
985 receiverConnected = (bool)socket;
986 rcv.notify_one();
987 }
Amnab8c33bb2023-08-03 14:40:01 -0400988 });
Morteza Namvar82960b32023-07-04 17:08:22 -0400989
Adrien Béraud4796de12023-09-25 14:46:47 -0400990 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -0400991 "1",
992 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -0400993 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -0400994 if (socket) {
Adrien Béraud4796de12023-09-25 14:46:47 -0400995 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -0400996 successfullyConnected = true;
997 rcv.notify_one();
998 }
999 });
Sébastien Blind0c92c72023-12-07 15:27:51 -05001000
Adrien Béraud4796de12023-09-25 14:46:47 -04001001 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001002 // Connect first channel. This will initiate a mx sock
1003 CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] {
Amnab8c33bb2023-08-03 14:40:01 -04001004 return successfullyReceive && successfullyConnected && receiverConnected;
1005 }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001006
Amnab8c33bb2023-08-03 14:40:01 -04001007 // Connect another channel, but close the connection
1008 bool channel2NotConnected = false;
Adrien Béraud4796de12023-09-25 14:46:47 -04001009 alice->connectionManager->connectDevice(bob->id.second,
Amnab8c33bb2023-08-03 14:40:01 -04001010 "2",
1011 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
Adrien Béraud4796de12023-09-25 14:46:47 -04001012 const DeviceId&) {
Amnab8c33bb2023-08-03 14:40:01 -04001013 channel2NotConnected = !socket;
1014 rcv.notify_one();
1015 });
1016 chan2cv.wait_for(lk, 30s);
Morteza Namvar82960b32023-07-04 17:08:22 -04001017
Amnab8c33bb2023-08-03 14:40:01 -04001018 // This should trigger onShutdown for second callback
1019 bob->connectionManager->closeConnectionsWith(aliceUri);
1020 CPPUNIT_ASSERT(rcv.wait_for(lk, 30s, [&] { return channel2NotConnected; }));
1021}
Morteza Namvar82960b32023-07-04 17:08:22 -04001022
Amnab8c33bb2023-08-03 14:40:01 -04001023void
1024ConnectionManagerTest::testFloodSocket()
1025{
Amnab8c33bb2023-08-03 14:40:01 -04001026 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1027 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001028
Amnab8c33bb2023-08-03 14:40:01 -04001029 std::condition_variable cv;
1030 bool successfullyConnected = false;
1031 bool successfullyReceive = false;
1032 bool receiverConnected = false;
1033 std::shared_ptr<dhtnet::ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2,
1034 sendSock3;
1035 bob->connectionManager->onChannelRequest(
1036 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
1037 const std::string& name) {
1038 successfullyReceive = name == "1";
1039 return true;
1040 });
1041 bob->connectionManager->onConnectionReady([&](const DeviceId&,
1042 const std::string& name,
1043 std::shared_ptr<dhtnet::ChannelSocket> socket) {
1044 receiverConnected = socket != nullptr;
1045 if (name == "1")
1046 rcvSock1 = socket;
1047 else if (name == "2")
1048 rcvSock2 = socket;
1049 else if (name == "3")
1050 rcvSock3 = socket;
1051 });
1052 alice->connectionManager->connectDevice(bob->id.second,
1053 "1",
1054 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1055 const DeviceId&) {
1056 if (socket) {
1057 sendSock = socket;
1058 successfullyConnected = true;
1059 }
1060 cv.notify_one();
1061 });
Adrien Béraud4796de12023-09-25 14:46:47 -04001062 std::unique_lock<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001063 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
1064 return successfullyReceive && successfullyConnected && receiverConnected;
1065 }));
1066 CPPUNIT_ASSERT(receiverConnected);
1067 successfullyConnected = false;
1068 receiverConnected = false;
1069 alice->connectionManager->connectDevice(bob->id.second,
1070 "2",
1071 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1072 const DeviceId&) {
1073 if (socket) {
1074 sendSock2 = socket;
1075 successfullyConnected = true;
1076 }
1077 cv.notify_one();
1078 });
1079 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
1080 successfullyConnected = false;
1081 receiverConnected = false;
1082 alice->connectionManager->connectDevice(bob->id.second,
1083 "3",
1084 [&](std::shared_ptr<dhtnet::ChannelSocket> socket,
1085 const DeviceId&) {
1086 if (socket) {
1087 sendSock3 = socket;
1088 successfullyConnected = true;
1089 }
1090 cv.notify_one();
1091 });
1092 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
Adrien Béraud4796de12023-09-25 14:46:47 -04001093 constexpr size_t C = 8000;
Amnab8c33bb2023-08-03 14:40:01 -04001094 std::string alphabet, shouldRcv, rcv1, rcv2, rcv3;
Adrien Béraud4796de12023-09-25 14:46:47 -04001095 std::mutex mtx1, mtx2, mtx3;
Amnab8c33bb2023-08-03 14:40:01 -04001096 for (int i = 0; i < 100; ++i)
1097 alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
Adrien Béraud4796de12023-09-25 14:46:47 -04001098 auto totSize = C * alphabet.size();
1099 shouldRcv.reserve(totSize);
1100 rcv1.reserve(totSize);
1101 rcv2.reserve(totSize);
1102 rcv3.reserve(totSize);
Amnab8c33bb2023-08-03 14:40:01 -04001103 rcvSock1->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001104 std::lock_guard<std::mutex> lk {mtx1};
1105 rcv1 += std::string_view((const char*)buf, len);
1106 if (rcv1.size() == totSize)
1107 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001108 return len;
1109 });
1110 rcvSock2->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001111 std::lock_guard<std::mutex> lk {mtx2};
1112 rcv2 += std::string_view((const char*)buf, len);
1113 if (rcv2.size() == totSize)
1114 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001115 return len;
1116 });
1117 rcvSock3->setOnRecv([&](const uint8_t* buf, size_t len) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001118 std::lock_guard<std::mutex> lk {mtx3};
1119 rcv3 += std::string_view((const char*)buf, len);
1120 if (rcv3.size() == totSize)
1121 cv.notify_one();
Amnab8c33bb2023-08-03 14:40:01 -04001122 return len;
1123 });
1124 for (uint64_t i = 0; i < alphabet.size(); ++i) {
Adrien Béraud4796de12023-09-25 14:46:47 -04001125 auto send = std::string(C, alphabet[i]);
Amnab8c33bb2023-08-03 14:40:01 -04001126 shouldRcv += send;
1127 std::error_code ec;
1128 sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1129 sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1130 sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1131 CPPUNIT_ASSERT(!ec);
1132 }
Adrien Béraud4796de12023-09-25 14:46:47 -04001133 {
1134 std::unique_lock<std::mutex> lk {mtx1};
1135 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return shouldRcv == rcv1; }));
1136 }
1137 {
1138 std::unique_lock<std::mutex> lk {mtx2};
1139 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return shouldRcv == rcv2; }));
1140 }
1141 {
1142 std::unique_lock<std::mutex> lk {mtx3};
1143 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return shouldRcv == rcv3; }));
1144 }
Amnab8c33bb2023-08-03 14:40:01 -04001145}
Morteza Namvar82960b32023-07-04 17:08:22 -04001146
Amnab8c33bb2023-08-03 14:40:01 -04001147void
1148ConnectionManagerTest::testDestroyWhileSending()
1149{
1150 // Same as test before, but destroy the accounts while sending.
1151 // This test if a segfault occurs
Amnab8c33bb2023-08-03 14:40:01 -04001152 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1153 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Amnab8c33bb2023-08-03 14:40:01 -04001154 std::unique_lock<std::mutex> lk {mtx};
1155 std::condition_variable cv;
1156 bool successfullyConnected = false;
1157 bool successfullyReceive = false;
1158 bool receiverConnected = false;
1159 std::shared_ptr<ChannelSocket> rcvSock1, rcvSock2, rcvSock3, sendSock, sendSock2, sendSock3;
1160 bob->connectionManager->onChannelRequest(
1161 [&successfullyReceive](const std::shared_ptr<dht::crypto::Certificate>&,
1162 const std::string& name) {
1163 successfullyReceive = name == "1";
1164 return true;
1165 });
1166 bob->connectionManager->onConnectionReady(
1167 [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
1168 receiverConnected = socket != nullptr;
1169 if (name == "1")
1170 rcvSock1 = socket;
1171 else if (name == "2")
1172 rcvSock2 = socket;
1173 else if (name == "3")
1174 rcvSock3 = socket;
1175 });
1176 alice->connectionManager->connectDevice(bob->id.second,
1177 "1",
1178 [&](std::shared_ptr<ChannelSocket> socket,
1179 const DeviceId&) {
1180 if (socket) {
1181 sendSock = socket;
1182 successfullyConnected = true;
1183 }
1184 cv.notify_one();
1185 });
1186 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] {
1187 return successfullyReceive && successfullyConnected && receiverConnected;
1188 }));
1189 successfullyConnected = false;
1190 receiverConnected = false;
1191 alice->connectionManager->connectDevice(bob->id.second,
1192 "2",
1193 [&](std::shared_ptr<ChannelSocket> socket,
1194 const DeviceId&) {
1195 if (socket) {
1196 sendSock2 = socket;
1197 successfullyConnected = true;
1198 }
1199 cv.notify_one();
1200 });
1201 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
1202 successfullyConnected = false;
1203 receiverConnected = false;
1204 alice->connectionManager->connectDevice(bob->id.second,
1205 "3",
1206 [&](std::shared_ptr<ChannelSocket> socket,
1207 const DeviceId&) {
1208 if (socket) {
1209 sendSock3 = socket;
1210 successfullyConnected = true;
1211 }
1212 cv.notify_one();
1213 });
1214 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyConnected && receiverConnected; }));
Amnab8c33bb2023-08-03 14:40:01 -04001215 std::string alphabet;
1216 for (int i = 0; i < 100; ++i)
1217 alphabet += "QWERTYUIOPASDFGHJKLZXCVBNM";
1218 rcvSock1->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1219 rcvSock2->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1220 rcvSock3->setOnRecv([&](const uint8_t*, size_t len) { return len; });
1221 for (uint64_t i = 0; i < alphabet.size(); ++i) {
1222 auto send = std::string(8000, alphabet[i]);
1223 std::error_code ec;
1224 sendSock->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1225 sendSock2->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1226 sendSock3->write(reinterpret_cast<unsigned char*>(send.data()), send.size(), ec);
1227 CPPUNIT_ASSERT(!ec);
1228 }
Morteza Namvar82960b32023-07-04 17:08:22 -04001229
Amnab8c33bb2023-08-03 14:40:01 -04001230 // No need to wait, immediately destroy, no segfault must occurs
1231}
Morteza Namvar82960b32023-07-04 17:08:22 -04001232
Amnab8c33bb2023-08-03 14:40:01 -04001233void
1234ConnectionManagerTest::testIsConnecting()
1235{
Amnab8c33bb2023-08-03 14:40:01 -04001236 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1237 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001238
Amnab8c33bb2023-08-03 14:40:01 -04001239 std::unique_lock<std::mutex> lk {mtx};
1240 std::condition_variable cv;
1241 bool successfullyConnected = false, successfullyReceive = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001242
Amnab8c33bb2023-08-03 14:40:01 -04001243 bob->connectionManager->onChannelRequest(
1244 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) {
1245 successfullyReceive = true;
1246 cv.notify_one();
1247 std::this_thread::sleep_for(2s);
1248 return true;
1249 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001250
Amnab8c33bb2023-08-03 14:40:01 -04001251 CPPUNIT_ASSERT(!alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
Morteza Namvar82960b32023-07-04 17:08:22 -04001252
Amnab8c33bb2023-08-03 14:40:01 -04001253 alice->connectionManager->connectDevice(bob->id.second,
1254 "sip",
1255 [&](std::shared_ptr<ChannelSocket> socket,
1256 const DeviceId&) {
1257 if (socket) {
1258 successfullyConnected = true;
1259 }
1260 cv.notify_one();
1261 });
1262 // connectDevice is full async, so isConnecting will be true after a few ms.
1263 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyReceive; }));
1264 CPPUNIT_ASSERT(alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
1265 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; }));
1266 std::this_thread::sleep_for(
1267 std::chrono::milliseconds(100)); // Just to wait for the callback to finish
1268 CPPUNIT_ASSERT(!alice->connectionManager->isConnecting(bob->id.second->getLongId(), "sip"));
1269}
Morteza Namvar82960b32023-07-04 17:08:22 -04001270
Amnab8c33bb2023-08-03 14:40:01 -04001271void
Sébastien Blind0c92c72023-12-07 15:27:51 -05001272ConnectionManagerTest::testIsConnected()
1273{
1274 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1275 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
1276
1277 std::unique_lock<std::mutex> lk {mtx};
1278 std::condition_variable cv;
1279 bool successfullyConnected = false, successfullyReceive = false;
1280
1281 bob->connectionManager->onChannelRequest(
1282 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) {
1283 return true;
1284 });
1285
1286 alice->connectionManager->connectDevice(bob->id.second,
1287 "sip",
1288 [&](std::shared_ptr<ChannelSocket> socket,
1289 const DeviceId&) {
1290 if (socket) {
1291 successfullyConnected = true;
1292 }
1293 cv.notify_one();
1294 });
1295 CPPUNIT_ASSERT(cv.wait_for(lk, 60s, [&] { return successfullyConnected; }));
1296 std::this_thread::sleep_for(
1297 std::chrono::milliseconds(100)); // Just to wait for the callback to finish
1298 CPPUNIT_ASSERT(alice->connectionManager->isConnected(bob->id.second->getLongId()));
1299}
1300
1301void
Amnab8c33bb2023-08-03 14:40:01 -04001302ConnectionManagerTest::testCanSendBeacon()
1303{
Amnab8c33bb2023-08-03 14:40:01 -04001304 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1305 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001306
Amnab8c33bb2023-08-03 14:40:01 -04001307 std::unique_lock<std::mutex> lk {mtx};
1308 std::condition_variable cv;
1309 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001310
Amnab8c33bb2023-08-03 14:40:01 -04001311 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1312 bob->connectionManager->onChannelRequest(
1313 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1314 bob->connectionManager->onConnectionReady(
1315 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1316 if (socket && socket->name() == "sip")
1317 bobSocket = socket->underlyingSocket();
1318 cv.notify_one();
1319 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001320
Amnab8c33bb2023-08-03 14:40:01 -04001321 alice->connectionManager->connectDevice(bob->id.second,
1322 "sip",
1323 [&](std::shared_ptr<ChannelSocket> socket,
1324 const DeviceId&) {
1325 if (socket) {
1326 aliceSocket = socket->underlyingSocket();
1327 successfullyConnected = true;
1328 }
1329 cv.notify_one();
1330 });
1331 // connectDevice is full async, so isConnecting will be true after a few ms.
1332 CPPUNIT_ASSERT(
1333 cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket && successfullyConnected; }));
1334 CPPUNIT_ASSERT(aliceSocket->canSendBeacon());
Morteza Namvar82960b32023-07-04 17:08:22 -04001335
Amnab8c33bb2023-08-03 14:40:01 -04001336 // Because onConnectionReady is true before version is sent, we can wait a bit
1337 // before canSendBeacon is true.
1338 auto start = std::chrono::steady_clock::now();
1339 auto aliceCanSendBeacon = false;
1340 auto bobCanSendBeacon = false;
1341 do {
1342 aliceCanSendBeacon = aliceSocket->canSendBeacon();
1343 bobCanSendBeacon = bobSocket->canSendBeacon();
1344 if (!bobCanSendBeacon || !aliceCanSendBeacon)
1345 std::this_thread::sleep_for(1s);
1346 } while ((not bobCanSendBeacon or not aliceCanSendBeacon)
1347 and std::chrono::steady_clock::now() - start < 5s);
Morteza Namvar82960b32023-07-04 17:08:22 -04001348
Amnab8c33bb2023-08-03 14:40:01 -04001349 CPPUNIT_ASSERT(bobCanSendBeacon && aliceCanSendBeacon);
1350}
Morteza Namvar82960b32023-07-04 17:08:22 -04001351
Amnab8c33bb2023-08-03 14:40:01 -04001352void
1353ConnectionManagerTest::testCannotSendBeacon()
1354{
Amnab8c33bb2023-08-03 14:40:01 -04001355 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1356 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001357
Amnab8c33bb2023-08-03 14:40:01 -04001358 std::unique_lock<std::mutex> lk {mtx};
1359 std::condition_variable cv;
1360 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001361
Amnab8c33bb2023-08-03 14:40:01 -04001362 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1363 bob->connectionManager->onChannelRequest(
1364 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1365 bob->connectionManager->onConnectionReady(
1366 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1367 if (socket && socket->name() == "sip")
1368 bobSocket = socket->underlyingSocket();
1369 cv.notify_one();
1370 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001371
Amnab8c33bb2023-08-03 14:40:01 -04001372 alice->connectionManager->connectDevice(bob->id.second,
1373 "sip",
1374 [&](std::shared_ptr<ChannelSocket> socket,
1375 const DeviceId&) {
1376 if (socket) {
1377 aliceSocket = socket->underlyingSocket();
1378 successfullyConnected = true;
1379 }
1380 cv.notify_one();
1381 });
1382 // connectDevice is full async, so isConnecting will be true after a few ms.
1383 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001384
Amnab8c33bb2023-08-03 14:40:01 -04001385 int version = 1412;
1386 bobSocket->setOnVersionCb([&](auto v) {
1387 version = v;
1388 cv.notify_one();
1389 });
1390 aliceSocket->setVersion(0);
1391 aliceSocket->sendVersion();
1392 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return version == 0; }));
1393 CPPUNIT_ASSERT(!bobSocket->canSendBeacon());
1394}
Morteza Namvar82960b32023-07-04 17:08:22 -04001395
Amnab8c33bb2023-08-03 14:40:01 -04001396void
1397ConnectionManagerTest::testConnectivityChangeTriggerBeacon()
1398{
Amnab8c33bb2023-08-03 14:40:01 -04001399 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1400 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001401
Amnab8c33bb2023-08-03 14:40:01 -04001402 std::unique_lock<std::mutex> lk {mtx};
1403 std::condition_variable cv;
1404 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001405
Amnab8c33bb2023-08-03 14:40:01 -04001406 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1407 bob->connectionManager->onChannelRequest(
1408 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1409 bob->connectionManager->onConnectionReady(
1410 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1411 if (socket && socket->name() == "sip")
1412 bobSocket = socket->underlyingSocket();
1413 cv.notify_one();
1414 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001415
Amnab8c33bb2023-08-03 14:40:01 -04001416 alice->connectionManager->connectDevice(bob->id.second,
1417 "sip",
1418 [&](std::shared_ptr<ChannelSocket> socket,
1419 const DeviceId&) {
1420 if (socket) {
1421 aliceSocket = socket->underlyingSocket();
1422 successfullyConnected = true;
1423 }
1424 cv.notify_one();
1425 });
1426 // connectDevice is full async, so isConnecting will be true after a few ms.
1427 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001428
Amnab8c33bb2023-08-03 14:40:01 -04001429 bool hasRequest = false;
1430 bobSocket->setOnBeaconCb([&](auto p) {
1431 if (p)
1432 hasRequest = true;
1433 cv.notify_one();
1434 });
1435 alice->connectionManager->connectivityChanged();
1436 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return hasRequest; }));
1437}
Morteza Namvar82960b32023-07-04 17:08:22 -04001438
Amnab8c33bb2023-08-03 14:40:01 -04001439void
1440ConnectionManagerTest::testOnNoBeaconTriggersShutdown()
1441{
Amnab8c33bb2023-08-03 14:40:01 -04001442 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
1443 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001444
Amnab8c33bb2023-08-03 14:40:01 -04001445 std::unique_lock<std::mutex> lk {mtx};
1446 std::condition_variable cv;
1447 bool successfullyConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001448
Amnab8c33bb2023-08-03 14:40:01 -04001449 std::shared_ptr<MultiplexedSocket> aliceSocket, bobSocket;
1450 bob->connectionManager->onChannelRequest(
1451 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1452 bob->connectionManager->onConnectionReady(
1453 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1454 if (socket && socket->name() == "sip")
1455 bobSocket = socket->underlyingSocket();
1456 cv.notify_one();
1457 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001458
Amnab8c33bb2023-08-03 14:40:01 -04001459 alice->connectionManager->connectDevice(bob->id.second,
1460 "sip",
1461 [&](std::shared_ptr<ChannelSocket> socket,
1462 const DeviceId&) {
1463 if (socket) {
1464 aliceSocket = socket->underlyingSocket();
1465 successfullyConnected = true;
1466 }
1467 cv.notify_one();
1468 });
1469 // connectDevice is full async, so isConnecting will be true after a few ms.
1470 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return aliceSocket && bobSocket; }));
Morteza Namvar82960b32023-07-04 17:08:22 -04001471
Amnab8c33bb2023-08-03 14:40:01 -04001472 bool isClosed = false;
1473 aliceSocket->onShutdown([&] {
1474 isClosed = true;
1475 cv.notify_one();
1476 });
1477 bobSocket->answerToBeacon(false);
1478 alice->connectionManager->connectivityChanged();
1479 CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&] { return isClosed; }));
1480}
Morteza Namvar82960b32023-07-04 17:08:22 -04001481
Amnab8c33bb2023-08-03 14:40:01 -04001482void
1483ConnectionManagerTest::testShutdownWhileNegotiating()
1484{
Amnab8c33bb2023-08-03 14:40:01 -04001485 alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
Morteza Namvar82960b32023-07-04 17:08:22 -04001486
Amnab8c33bb2023-08-03 14:40:01 -04001487 std::unique_lock<std::mutex> lk {mtx};
1488 std::condition_variable cv;
1489 bool successfullyReceive = false;
1490 bool notConnected = false;
Morteza Namvar82960b32023-07-04 17:08:22 -04001491
Amnab8c33bb2023-08-03 14:40:01 -04001492 bob->connectionManager->onICERequest([&](const DeviceId&) {
1493 successfullyReceive = true;
1494 cv.notify_one();
1495 return true;
1496 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001497
Amnab8c33bb2023-08-03 14:40:01 -04001498 alice->connectionManager->connectDevice(bob->id.second,
1499 "git://*",
1500 [&](std::shared_ptr<ChannelSocket> socket,
1501 const DeviceId&) {
1502 notConnected = !socket;
1503 cv.notify_one();
1504 });
Morteza Namvar82960b32023-07-04 17:08:22 -04001505
Amnab8c33bb2023-08-03 14:40:01 -04001506 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return successfullyReceive; }));
1507 // Manager::instance().setAccountActive(alice->id.second, false, true);
Morteza Namvar82960b32023-07-04 17:08:22 -04001508
Amnab8c33bb2023-08-03 14:40:01 -04001509 // Just move destruction on another thread.
1510 // dht::threadpool::io().run([conMgr =std::move(alice->connectionManager)] {});
1511 alice->connectionManager.reset();
Morteza Namvar82960b32023-07-04 17:08:22 -04001512
Amnab8c33bb2023-08-03 14:40:01 -04001513 CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&] { return notConnected; }));
1514}
Adrien Béraud75754b22023-10-17 09:16:06 -04001515
Amnab8c33bb2023-08-03 14:40:01 -04001516void
1517ConnectionManagerTest::testGetChannelList()
1518{
Amnab8c33bb2023-08-03 14:40:01 -04001519 bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
Amnab8c33bb2023-08-03 14:40:01 -04001520 std::condition_variable cv;
1521 std::unique_lock<std::mutex> lk {mtx};
1522 bool successfullyConnected = false;
1523 int receiverConnected = 0;
1524 bob->connectionManager->onChannelRequest(
1525 [](const std::shared_ptr<dht::crypto::Certificate>&, const std::string&) { return true; });
1526 bob->connectionManager->onConnectionReady(
Adrien Béraud75754b22023-10-17 09:16:06 -04001527 [&](const DeviceId&, const std::string&, std::shared_ptr<ChannelSocket> socket) {
1528 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001529 if (socket)
1530 receiverConnected += 1;
Amnab8c33bb2023-08-03 14:40:01 -04001531 cv.notify_one();
1532 });
1533 std::string channelId;
1534 alice->connectionManager->connectDevice(bob->id.second,
1535 "git://*",
1536 [&](std::shared_ptr<ChannelSocket> socket,
1537 const DeviceId&) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001538 std::lock_guard<std::mutex> lk {mtx};
Amnab8c33bb2023-08-03 14:40:01 -04001539 if (socket) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001540 channelId = fmt::format(FMT_COMPILE("{:x}"), socket->channel());
Amnab8c33bb2023-08-03 14:40:01 -04001541 successfullyConnected = true;
1542 }
Amnab8c33bb2023-08-03 14:40:01 -04001543 cv.notify_one();
1544 });
1545 CPPUNIT_ASSERT(
1546 cv.wait_for(lk, 60s, [&] { return successfullyConnected && receiverConnected == 1; }));
1547 std::vector<std::map<std::string, std::string>> expectedList = {
Adrien Béraud75754b22023-10-17 09:16:06 -04001548 {{"id", channelId}, {"name", "git://*"}}};
Amnab8c33bb2023-08-03 14:40:01 -04001549 auto connectionList = alice->connectionManager->getConnectionList();
1550 CPPUNIT_ASSERT(!connectionList.empty());
1551 const auto& connectionInfo = connectionList[0];
1552 auto it = connectionInfo.find("id");
1553 CPPUNIT_ASSERT(it != connectionInfo.end());
Adrien Béraud75754b22023-10-17 09:16:06 -04001554 auto actualList = alice->connectionManager->getChannelList(it->second);
Amnab8c33bb2023-08-03 14:40:01 -04001555 CPPUNIT_ASSERT(expectedList.size() == actualList.size());
Amnab8c33bb2023-08-03 14:40:01 -04001556 for (const auto& expectedMap : expectedList) {
Adrien Béraud75754b22023-10-17 09:16:06 -04001557 CPPUNIT_ASSERT(std::find(actualList.begin(), actualList.end(), expectedMap)
1558 != actualList.end());
Amnab8c33bb2023-08-03 14:40:01 -04001559 }
1560}
Adrien Béraudefe27372023-05-27 18:56:29 -04001561
1562} // namespace test
Sébastien Blin464bdff2023-07-19 08:02:53 -04001563} // namespace dhtnet
Adrien Béraudefe27372023-05-27 18:56:29 -04001564
Adrien Béraud1ae60aa2023-07-07 09:55:09 -04001565JAMI_TEST_RUNNER(dhtnet::test::ConnectionManagerTest::name())