blob: cd9df820cc0f931857a8a2d2db94ead04fdff225 [file] [log] [blame]
Amna4a70f5c2023-09-14 17:32:05 -04001/*
2 * Copyright (C) 2023 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "dsh.h"
18#include "../common.h"
19#include <opendht/log.h>
20#include <opendht/crypto.h>
21
22#include <asio/io_context.hpp>
23#include <sys/types.h>
24#include <sys/wait.h>
25namespace dhtnet {
26
27const int READ_END = 0;
28const int WRITE_END = 1;
29
30void
31create_pipe(int pipe[2])
32{
33 if (pipe2(pipe, O_CLOEXEC) == -1) {
34 perror("pipe2");
35 exit(EXIT_FAILURE);
36 }
37}
38void
39child_proc(const int in_pipe[2],
40 const int out_pipe[2],
41 const int error_pipe[2],
42 const std::string& name)
43{
44 // close unused write end of input pipe and read end of output pipe
45 close(in_pipe[WRITE_END]);
46 close(out_pipe[READ_END]);
47 close(error_pipe[READ_END]);
48
49 // replace stdin with input pipe
50 if (dup2(in_pipe[READ_END], STDIN_FILENO) == -1) {
51 perror("dup2: error replacing stdin");
52 exit(EXIT_FAILURE);
53 }
54
55 // replace stdout with output pipe
56 if (dup2(out_pipe[WRITE_END], STDOUT_FILENO) == -1) {
57 perror("dup2: error replacing stdout");
58 exit(EXIT_FAILURE);
59 }
60 // replace stderr with error pipe
61 if (dup2(error_pipe[WRITE_END], STDERR_FILENO) == -1) {
62 perror("dup2: error replacing stderr");
63 exit(EXIT_FAILURE);
64 }
65
66 // prepare arguments
67 const char* args[] = {name.c_str(), NULL};
68 // execute subprocess
69 execvp(args[0], const_cast<char* const*>(args));
70
71 // if execv returns, an error occurred
72 perror("execvp");
73 exit(EXIT_FAILURE);
74}
75
76dhtnet::Dsh::Dsh(const std::filesystem::path& path,
77 dht::crypto::Identity identity,
78 const std::string& bootstrap)
79 : logger(dht::log::getStdLogger())
80 // , std::shared_ptr<tls::CertificateStore>(path / "certstore", logger)
81{
82 auto certStore = std::make_shared<tls::CertificateStore>(path / "certstore", logger);
83
84 ioContext = std::make_shared<asio::io_context>();
85 ioContextRunner = std::thread([context = ioContext, logger = logger] {
86 try {
87 auto work = asio::make_work_guard(*context);
88 context->run();
89 } catch (const std::exception& ex) {
90 if (logger)
91 logger->error("Error in ioContextRunner: {}", ex.what());
92 }
93 });
94 // Build a server
95 auto config = connectionManagerConfig(path,
96 identity,
97 bootstrap,
98 logger,
99 certStore,
100 ioContext,
101 factory);
102 // create a connection manager
103 connectionManager = std::make_unique<ConnectionManager>(std::move(config));
104
105 connectionManager->onDhtConnected(identity.first->getPublicKey());
106 connectionManager->onICERequest([this](const dht::Hash<32>&) { // handle ICE request
107 if (logger)
108 logger->debug("ICE request received");
109 return true;
110 });
111
112 std::mutex mtx;
113 std::unique_lock<std::mutex> lk {mtx};
114
115 connectionManager->onChannelRequest(
116 [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
117 // handle channel request
118 if (logger)
119 logger->debug("Channel request received");
120 return true;
121 });
122
123 connectionManager->onConnectionReady([&](const DeviceId&,
124 const std::string& name,
125 std::shared_ptr<ChannelSocket> socket) {
126 // handle connection ready
127 try {
128 // Create a pipe for communication with the subprocess
129 // create pipes
130 int in_pipe[2], out_pipe[2], error_pipe[2];
131 create_pipe(in_pipe);
132 create_pipe(out_pipe);
133 create_pipe(error_pipe);
134
135 ioContext->notify_fork(asio::io_context::fork_prepare);
136
137 // Fork to create a child process
138 pid_t pid = fork();
139 if (pid == -1) {
140 perror("fork");
141 return EXIT_FAILURE;
142 } else if (pid == 0) { // Child process
143 ioContext->notify_fork(asio::io_context::fork_child);
144 child_proc(in_pipe, out_pipe, error_pipe, name);
145 return EXIT_SUCCESS; // never reached
146 } else {
147 ioContext->notify_fork(asio::io_context::fork_parent);
148
149 // close unused read end of input pipe and write end of output pipe
150 close(in_pipe[READ_END]);
151 close(out_pipe[WRITE_END]);
152 close(error_pipe[WRITE_END]);
153
154 asio::io_context& ioContextRef = *ioContext;
155 // create stream descriptors
156 auto inStream
157 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
158 in_pipe[WRITE_END]);
159 auto outStream
160 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
161 out_pipe[READ_END]);
162 auto errorStream
163 = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
164 error_pipe[READ_END]);
165
166 if (socket) {
167 socket->setOnRecv([this, socket, inStream](const uint8_t* data, size_t size) {
168 auto data_copy = std::make_shared<std::vector<uint8_t>>(data, data + size);
169 // write on pipe to sub child
170 std::error_code ec;
171 asio::async_write(*inStream,
172 asio::buffer(*data_copy),
173 [data_copy, this](const std::error_code& error,
174 std::size_t bytesWritten) {
175 if (error) {
176 if (logger)
177 logger->error("Write error: {}",
178 error.message());
179 }
180 });
181 return size;
182 });
183
184 // read from pipe to sub child
185
186 // Create a buffer to read data into
187 auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
188
189 // Create a shared_ptr to the stream_descriptor
190 readFromPipe(socket, outStream, buffer);
191 readFromPipe(socket, errorStream, buffer);
192
193 return EXIT_SUCCESS;
194 }
195 }
196
197 } catch (const std::exception& e) {
198 if (logger)
199 logger->error("Error: {}", e.what());
200 }
201 return 0;
202 });
203}
204
205dhtnet::Dsh::Dsh(const std::filesystem::path& path,
206 dht::crypto::Identity identity,
207 const std::string& bootstrap,
208 dht::InfoHash peer_id,
209 const std::string& binary)
210 : Dsh(path, identity, bootstrap)
211{
212 // Build a client
213 std::condition_variable cv;
214 connectionManager->connectDevice(
215 peer_id, binary, [&](std::shared_ptr<ChannelSocket> socket, const dht::InfoHash&) {
216 if (socket) {
217 socket->setOnRecv([this, socket](const uint8_t* data, size_t size) {
218 std::cout.write((const char*) data, size);
219 std::cout.flush();
220 return size;
221 });
222 // Create a buffer to read data into
223 auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
224
225 // Create a shared_ptr to the stream_descriptor
226 auto stdinPipe = std::make_shared<asio::posix::stream_descriptor>(*ioContext,
227 ::dup(
228 STDIN_FILENO));
229 readFromPipe(socket, stdinPipe, buffer);
230
231 socket->onShutdown([this]() {
232 if (logger)
233 logger->debug("Exit program");
234 ioContext->stop();
235 });
236 }
237 });
238
239 connectionManager->onConnectionReady([&](const DeviceId&,
240 const std::string& name,
241 std::shared_ptr<ChannelSocket> socket_received) {
242 if (logger)
243 logger->debug("Connected!");
244 });
245}
246
247void
248dhtnet::Dsh::run()
249{
250 ioContext->run();
251}
252
253dhtnet::Dsh::~Dsh()
254{
255 ioContext->stop();
256 ioContextRunner.join();
257}
258
259} // namespace dhtnet