comparison foosdk/sdk/pfc/wait_queue.h @ 1:20d02a178406 default tip

*: check in everything else yay
author Paper <paper@tflc.us>
date Mon, 05 Jan 2026 02:15:46 -0500
parents
children
comparison
equal deleted inserted replaced
0:e9bb126753e7 1:20d02a178406
1 #pragma once
2
3 #include <list>
4 #include "synchro.h"
5
6 namespace pfc {
7
8 template<typename obj_t>
9 class waitQueue {
10 public:
11 waitQueue() : m_eof() {}
12
13 template<typename arg_t>
14 void put( arg_t && obj ) {
15 mutexScope guard( m_mutex );
16 m_list.push_back( std::forward<arg_t>(obj) );
17 if ( m_list.size() == 1 ) m_canRead.set_state( true );
18 }
19
20 void set_eof() {
21 mutexScope guard(m_mutex);
22 m_eof = true;
23 m_canRead.set_state(true);
24 }
25 bool wait_read( double timeout ) {
26 return m_canRead.wait_for( timeout );
27 }
28 eventHandle_t get_event_handle() {
29 return m_canRead.get_handle();
30 }
31
32 bool get( obj_t & out ) {
33 for ( ;; ) {
34 m_canRead.wait_for(-1);
35 mutexScope guard(m_mutex);
36 auto i = m_list.begin();
37 if ( i == m_list.end() ) {
38 if (m_eof) return false;
39 continue;
40 }
41 out = std::move(*i);
42 m_list.erase( i );
43 didGet();
44 return true;
45 }
46 }
47
48 bool get( obj_t & out, pfc::eventHandle_t hAbort, bool * didAbort = nullptr ) {
49 if (didAbort != nullptr) * didAbort = false;
50 for ( ;; ) {
51 if (pfc::event::g_twoEventWait( hAbort, m_canRead.get_handle(), -1) == 1) {
52 if (didAbort != nullptr) * didAbort = true;
53 return false;
54 }
55 mutexScope guard(m_mutex);
56 auto i = m_list.begin();
57 if ( i == m_list.end() ) {
58 if (m_eof) return false;
59 continue;
60 }
61 out = std::move(*i);
62 m_list.erase( i );
63 didGet();
64 return true;
65 }
66 }
67 void clear() {
68 mutexScope guard(m_mutex);
69 m_eof = false;
70 m_list.clear();
71 m_canRead.set_state(false);
72 }
73 private:
74 void didGet() {
75 if (!m_eof && m_list.size() == 0) m_canRead.set_state(false);
76 }
77 bool m_eof;
78 std::list<obj_t> m_list;
79 mutex m_mutex;
80 event m_canRead;
81 };
82
83 template<typename obj_t_>
84 class waitQueue2 {
85 protected:
86 typedef obj_t_ obj_t;
87 typedef std::list<obj_t_> list_t;
88 virtual bool canWriteCheck(list_t const &) { return true; }
89
90 public:
91 void waitWrite() {
92 m_canWrite.wait_for(-1);
93 }
94 bool waitWrite(pfc::eventHandle_t hAbort) {
95 return event::g_twoEventWait( hAbort, m_canWrite.get_handle(), -1) == 2;
96 }
97 eventHandle_t waitWriteHandle() {
98 return m_canWrite.get_handle();
99 }
100
101 waitQueue2() {
102 m_canWrite.set_state(true);
103 }
104
105 template<typename arg_t>
106 void put(arg_t && obj) {
107 mutexScope guard(m_mutex);
108 m_list.push_back(std::forward<arg_t>(obj));
109 if (m_list.size() == 1) m_canRead.set_state(true);
110 refreshCanWrite();
111 }
112
113 void set_eof() {
114 mutexScope guard(m_mutex);
115 m_eof = true;
116 m_canRead.set_state(true);
117 m_canWrite.set_state(false);
118 }
119
120 bool get(obj_t & out) {
121 for (;; ) {
122 m_canRead.wait_for(-1);
123 mutexScope guard(m_mutex);
124 auto i = m_list.begin();
125 if (i == m_list.end()) {
126 if (m_eof) return false;
127 continue;
128 }
129 out = std::move(*i);
130 m_list.erase(i);
131 didGet();
132 return true;
133 }
134 }
135
136 typedef std::function<bool(obj_t&)> receive_peek_t;
137 // Block until there's something to return + return multiple objects at once.
138 // Use peek function (optional) to stop reading / leave remaining items for the next call to pick up.
139 std::list<obj_t> receive(pfc::eventHandle_t hAbort, receive_peek_t peek = nullptr , bool* didAbort = nullptr) {
140 if (didAbort != nullptr) *didAbort = false;
141 std::list<obj_t> ret;
142 for (bool retry = false; ; retry = true ) {
143 // try without wait first, only place where this is really used can poll abort before/after without system calls
144 if (retry && pfc::event::g_twoEventWait(hAbort, m_canRead.get_handle(), -1) == 1) {
145 if (didAbort != nullptr) *didAbort = true;
146 break;
147 }
148 mutexScope guard(m_mutex);
149 auto i = m_list.begin();
150 if (i == m_list.end()) {
151 if (m_eof) break;
152 continue;
153 }
154 bool bDidGet = false;
155 do {
156 if (peek && !peek(*i)) break;
157 auto n = i; ++n;
158 ret.splice(ret.end(), m_list, i);
159 i = std::move(n);
160 bDidGet = true;
161 } while (i != m_list.end());
162 if ( bDidGet ) didGet();
163 break;
164 }
165 return ret;
166 }
167
168 bool get(obj_t & out, pfc::eventHandle_t hAbort, bool * didAbort = nullptr) {
169 if (didAbort != nullptr) * didAbort = false;
170 for (;; ) {
171 if (pfc::event::g_twoEventWait(hAbort, m_canRead.get_handle(), -1) == 1) {
172 if (didAbort != nullptr) * didAbort = true;
173 return false;
174 }
175 mutexScope guard(m_mutex);
176 auto i = m_list.begin();
177 if (i == m_list.end()) {
178 if (m_eof) return false;
179 continue;
180 }
181 out = std::move(*i);
182 m_list.erase(i);
183 didGet();
184 return true;
185 }
186 }
187
188 void clear() {
189 mutexScope guard(m_mutex);
190 m_list.clear();
191 m_eof = false;
192 m_canRead.set_state(false);
193 m_canWrite.set_state(true);
194 }
195 private:
196 void didGet() {
197 // mutex assumed locked
198 if (m_eof) return;
199 if (m_list.empty()) {
200 m_canRead.set_state(false);
201 m_canWrite.set_state(true);
202 } else {
203 m_canWrite.set_state(canWriteCheck(m_list));
204 }
205 }
206 void refreshCanWrite() {
207 // mutex assumed locked
208 m_canWrite.set_state( !m_eof && canWriteCheck(m_list));
209 }
210 bool m_eof = false;
211 std::list<obj_t> m_list;
212 mutex m_mutex;
213 event m_canRead, m_canWrite;
214 };
215 }