blob: 51b80a55c75689d969d8b755a755ce50b89c0266 [file] [log] [blame]
Amna4a70f5c2023-09-14 17:32:05 -04001/*
2 * Copyright (C) 2023 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "dsh.h"
18#include "../common.h"
19#include <opendht/log.h>
20#include <opendht/crypto.h>
21
22#include <asio/io_context.hpp>
23#include <sys/types.h>
24#include <sys/wait.h>
25namespace dhtnet {
26
27const int READ_END = 0;
28const int WRITE_END = 1;
29
30void
Adrien Béraud651a4c62023-09-20 14:17:08 -040031create_pipe(int apipe[2])
Amna4a70f5c2023-09-14 17:32:05 -040032{
Adrien Béraud651a4c62023-09-20 14:17:08 -040033#ifdef __APPLE__
34 if (pipe(apipe) < 0)
35 perror("pipe");
36
37 if (fcntl(apipe[0], F_SETFD, FD_CLOEXEC) < 0)
38 perror("unable to set pipe FD_CLOEXEC");
39
40 if (fcntl(apipe[1], F_SETFD, FD_CLOEXEC) < 0)
41 perror("unable to set pipe FD_CLOEXEC");
42#else
43 if (pipe2(apipe, O_CLOEXEC) == -1) {
Amna4a70f5c2023-09-14 17:32:05 -040044 perror("pipe2");
45 exit(EXIT_FAILURE);
46 }
Adrien Béraud651a4c62023-09-20 14:17:08 -040047#endif
Amna4a70f5c2023-09-14 17:32:05 -040048}
Adrien Béraud651a4c62023-09-20 14:17:08 -040049
Amna4a70f5c2023-09-14 17:32:05 -040050void
51child_proc(const int in_pipe[2],
52 const int out_pipe[2],
53 const int error_pipe[2],
54 const std::string& name)
55{
56 // close unused write end of input pipe and read end of output pipe
57 close(in_pipe[WRITE_END]);
58 close(out_pipe[READ_END]);
59 close(error_pipe[READ_END]);
60
61 // replace stdin with input pipe
62 if (dup2(in_pipe[READ_END], STDIN_FILENO) == -1) {
63 perror("dup2: error replacing stdin");
64 exit(EXIT_FAILURE);
65 }
66
67 // replace stdout with output pipe
68 if (dup2(out_pipe[WRITE_END], STDOUT_FILENO) == -1) {
69 perror("dup2: error replacing stdout");
70 exit(EXIT_FAILURE);
71 }
72 // replace stderr with error pipe
73 if (dup2(error_pipe[WRITE_END], STDERR_FILENO) == -1) {
74 perror("dup2: error replacing stderr");
75 exit(EXIT_FAILURE);
76 }
77
78 // prepare arguments
79 const char* args[] = {name.c_str(), NULL};
80 // execute subprocess
81 execvp(args[0], const_cast<char* const*>(args));
82
83 // if execv returns, an error occurred
84 perror("execvp");
85 exit(EXIT_FAILURE);
86}
87
88dhtnet::Dsh::Dsh(const std::filesystem::path& path,
89 dht::crypto::Identity identity,
90 const std::string& bootstrap)
91 : logger(dht::log::getStdLogger())
92 // , std::shared_ptr<tls::CertificateStore>(path / "certstore", logger)
93{
94 auto certStore = std::make_shared<tls::CertificateStore>(path / "certstore", logger);
95
96 ioContext = std::make_shared<asio::io_context>();
97 ioContextRunner = std::thread([context = ioContext, logger = logger] {
98 try {
99 auto work = asio::make_work_guard(*context);
100 context->run();
101 } catch (const std::exception& ex) {
102 if (logger)
103 logger->error("Error in ioContextRunner: {}", ex.what());
104 }
105 });
106 // Build a server
107 auto config = connectionManagerConfig(path,
108 identity,
109 bootstrap,
110 logger,
111 certStore,
112 ioContext,
113 factory);
114 // create a connection manager
115 connectionManager = std::make_unique<ConnectionManager>(std::move(config));
116
117 connectionManager->onDhtConnected(identity.first->getPublicKey());
118 connectionManager->onICERequest([this](const dht::Hash<32>&) { // handle ICE request
119 if (logger)
120 logger->debug("ICE request received");
121 return true;
122 });
123
124 std::mutex mtx;
125 std::unique_lock<std::mutex> lk {mtx};
126
127 connectionManager->onChannelRequest(
128 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
129 // handle channel request
130 if (logger)
131 logger->debug("Channel request received");
132 return true;
133 });
134
135 connectionManager->onConnectionReady([&](const DeviceId&,
136 const std::string& name,
137 std::shared_ptr<ChannelSocket> socket) {
138 // handle connection ready
139 try {
140 // Create a pipe for communication with the subprocess
141 // create pipes
142 int in_pipe[2], out_pipe[2], error_pipe[2];
143 create_pipe(in_pipe);
144 create_pipe(out_pipe);
145 create_pipe(error_pipe);
146
147 ioContext->notify_fork(asio::io_context::fork_prepare);
148
149 // Fork to create a child process
150 pid_t pid = fork();
151 if (pid == -1) {
152 perror("fork");
153 return EXIT_FAILURE;
154 } else if (pid == 0) { // Child process
155 ioContext->notify_fork(asio::io_context::fork_child);
156 child_proc(in_pipe, out_pipe, error_pipe, name);
157 return EXIT_SUCCESS; // never reached
158 } else {
159 ioContext->notify_fork(asio::io_context::fork_parent);
160
161 // close unused read end of input pipe and write end of output pipe
162 close(in_pipe[READ_END]);
163 close(out_pipe[WRITE_END]);
164 close(error_pipe[WRITE_END]);
165
166 asio::io_context& ioContextRef = *ioContext;
167 // create stream descriptors
168 auto inStream
169 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
170 in_pipe[WRITE_END]);
171 auto outStream
172 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
173 out_pipe[READ_END]);
174 auto errorStream
175 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
176 error_pipe[READ_END]);
177
178 if (socket) {
179 socket->setOnRecv([this, socket, inStream](const uint8_t* data, size_t size) {
180 auto data_copy = std::make_shared<std::vector<uint8_t>>(data, data + size);
181 // write on pipe to sub child
182 std::error_code ec;
183 asio::async_write(*inStream,
184 asio::buffer(*data_copy),
185 [data_copy, this](const std::error_code& error,
186 std::size_t bytesWritten) {
187 if (error) {
188 if (logger)
189 logger->error("Write error: {}",
190 error.message());
191 }
192 });
193 return size;
194 });
195
196 // read from pipe to sub child
197
198 // Create a buffer to read data into
199 auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
200
201 // Create a shared_ptr to the stream_descriptor
202 readFromPipe(socket, outStream, buffer);
203 readFromPipe(socket, errorStream, buffer);
204
205 return EXIT_SUCCESS;
206 }
207 }
208
209 } catch (const std::exception& e) {
210 if (logger)
211 logger->error("Error: {}", e.what());
212 }
213 return 0;
214 });
215}
216
217dhtnet::Dsh::Dsh(const std::filesystem::path& path,
218 dht::crypto::Identity identity,
219 const std::string& bootstrap,
220 dht::InfoHash peer_id,
221 const std::string& binary)
222 : Dsh(path, identity, bootstrap)
223{
224 // Build a client
225 std::condition_variable cv;
226 connectionManager->connectDevice(
227 peer_id, binary, [&](std::shared_ptr<ChannelSocket> socket, const dht::InfoHash&) {
228 if (socket) {
229 socket->setOnRecv([this, socket](const uint8_t* data, size_t size) {
230 std::cout.write((const char*) data, size);
231 std::cout.flush();
232 return size;
233 });
234 // Create a buffer to read data into
235 auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
236
237 // Create a shared_ptr to the stream_descriptor
238 auto stdinPipe = std::make_shared<asio::posix::stream_descriptor>(*ioContext,
239 ::dup(
240 STDIN_FILENO));
241 readFromPipe(socket, stdinPipe, buffer);
242
243 socket->onShutdown([this]() {
244 if (logger)
245 logger->debug("Exit program");
246 ioContext->stop();
247 });
248 }
249 });
250
251 connectionManager->onConnectionReady([&](const DeviceId&,
252 const std::string& name,
253 std::shared_ptr<ChannelSocket> socket_received) {
254 if (logger)
255 logger->debug("Connected!");
256 });
257}
258
259void
260dhtnet::Dsh::run()
261{
262 ioContext->run();
263}
264
265dhtnet::Dsh::~Dsh()
266{
267 ioContext->stop();
268 ioContextRunner.join();
269}
270
271} // namespace dhtnet