blob: 62dd76ed0356336b8867b95fa98e18f12f2d3985 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#pragma once
18
19#include <mutex>
20#include <condition_variable>
21#include <deque>
22#include <algorithm>
23
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040024namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040025
26class PeerChannel
27{
28public:
29 PeerChannel() {}
30 ~PeerChannel() { stop(); }
31 PeerChannel(PeerChannel&& o)
32 {
33 std::lock_guard<std::mutex> lk(o.mutex_);
34 stream_ = std::move(o.stream_);
35 stop_ = o.stop_;
36 o.cv_.notify_all();
37 }
38
39 template<typename Duration>
40 ssize_t wait(Duration timeout, std::error_code& ec)
41 {
42 std::unique_lock<std::mutex> lk {mutex_};
43 cv_.wait_for(lk, timeout, [this] { return stop_ or not stream_.empty(); });
44 if (stop_) {
45 ec = std::make_error_code(std::errc::broken_pipe);
46 return -1;
47 }
48 ec.clear();
49 return stream_.size();
50 }
51
52 ssize_t read(char* output, std::size_t size, std::error_code& ec)
53 {
54 std::unique_lock<std::mutex> lk {mutex_};
55 cv_.wait(lk, [this] { return stop_ or not stream_.empty(); });
56 if (stream_.size()) {
57 auto toRead = std::min(size, stream_.size());
58 if (toRead) {
59 auto endIt = stream_.begin() + toRead;
60 std::copy(stream_.begin(), endIt, output);
61 stream_.erase(stream_.begin(), endIt);
62 }
63 ec.clear();
64 return toRead;
65 }
66 if (stop_) {
67 ec.clear();
68 return 0;
69 }
70 ec = std::make_error_code(std::errc::resource_unavailable_try_again);
71 return -1;
72 }
73
74 ssize_t write(const char* data, std::size_t size, std::error_code& ec)
75 {
76 std::lock_guard<std::mutex> lk {mutex_};
77 if (stop_) {
78 ec = std::make_error_code(std::errc::broken_pipe);
79 return -1;
80 }
81 stream_.insert(stream_.end(), data, data + size);
82 cv_.notify_all();
83 ec.clear();
84 return size;
85 }
86
87 void stop() noexcept
88 {
89 std::lock_guard<std::mutex> lk {mutex_};
90 if (stop_)
91 return;
92 stop_ = true;
93 cv_.notify_all();
94 }
95
96private:
97 PeerChannel(const PeerChannel& o) = delete;
98 PeerChannel& operator=(const PeerChannel& o) = delete;
99 PeerChannel& operator=(PeerChannel&& o) = delete;
100
101 std::mutex mutex_ {};
102 std::condition_variable cv_ {};
103 std::deque<char> stream_;
104 bool stop_ {false};
105};
106
107} // namespace jami