blob: a2a910f9ab1086cabe37bbfe01c670079376c828 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19#pragma once
20
21#include <mutex>
22#include <condition_variable>
23#include <deque>
24#include <algorithm>
25
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040026namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040027
28class PeerChannel
29{
30public:
31 PeerChannel() {}
32 ~PeerChannel() { stop(); }
33 PeerChannel(PeerChannel&& o)
34 {
35 std::lock_guard<std::mutex> lk(o.mutex_);
36 stream_ = std::move(o.stream_);
37 stop_ = o.stop_;
38 o.cv_.notify_all();
39 }
40
41 template<typename Duration>
42 ssize_t wait(Duration timeout, std::error_code& ec)
43 {
44 std::unique_lock<std::mutex> lk {mutex_};
45 cv_.wait_for(lk, timeout, [this] { return stop_ or not stream_.empty(); });
46 if (stop_) {
47 ec = std::make_error_code(std::errc::broken_pipe);
48 return -1;
49 }
50 ec.clear();
51 return stream_.size();
52 }
53
54 ssize_t read(char* output, std::size_t size, std::error_code& ec)
55 {
56 std::unique_lock<std::mutex> lk {mutex_};
57 cv_.wait(lk, [this] { return stop_ or not stream_.empty(); });
58 if (stream_.size()) {
59 auto toRead = std::min(size, stream_.size());
60 if (toRead) {
61 auto endIt = stream_.begin() + toRead;
62 std::copy(stream_.begin(), endIt, output);
63 stream_.erase(stream_.begin(), endIt);
64 }
65 ec.clear();
66 return toRead;
67 }
68 if (stop_) {
69 ec.clear();
70 return 0;
71 }
72 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
73 return -1;
74 }
75
76 ssize_t write(const char* data, std::size_t size, std::error_code& ec)
77 {
78 std::lock_guard<std::mutex> lk {mutex_};
79 if (stop_) {
80 ec = std::make_error_code(std::errc::broken_pipe);
81 return -1;
82 }
83 stream_.insert(stream_.end(), data, data + size);
84 cv_.notify_all();
85 ec.clear();
86 return size;
87 }
88
89 void stop() noexcept
90 {
91 std::lock_guard<std::mutex> lk {mutex_};
92 if (stop_)
93 return;
94 stop_ = true;
95 cv_.notify_all();
96 }
97
98private:
99 PeerChannel(const PeerChannel& o) = delete;
100 PeerChannel& operator=(const PeerChannel& o) = delete;
101 PeerChannel& operator=(PeerChannel&& o) = delete;
102
103 std::mutex mutex_ {};
104 std::condition_variable cv_ {};
105 std::deque<char> stream_;
106 bool stop_ {false};
107};
108
109} // namespace jami