diff foosdk/sdk/pfc/wait_queue.h @ 1:20d02a178406 default tip

*: check in everything else yay
author Paper <paper@tflc.us>
date Mon, 05 Jan 2026 02:15:46 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/foosdk/sdk/pfc/wait_queue.h	Mon Jan 05 02:15:46 2026 -0500
@@ -0,0 +1,215 @@
+#pragma once
+
+#include <list>
+#include "synchro.h"
+
+namespace pfc {
+
+	template<typename obj_t>
+	class waitQueue {
+	public:
+		waitQueue() : m_eof() {}
+		
+		template<typename arg_t>
+		void put( arg_t && obj ) {
+			mutexScope guard( m_mutex );
+			m_list.push_back( std::forward<arg_t>(obj) );
+			if ( m_list.size() == 1 ) m_canRead.set_state( true );
+		}
+
+		void set_eof() {
+			mutexScope guard(m_mutex);
+			m_eof = true;
+			m_canRead.set_state(true);
+		}
+        bool wait_read( double timeout ) {
+            return m_canRead.wait_for( timeout );
+        }
+        eventHandle_t get_event_handle() {
+            return m_canRead.get_handle();
+        }
+
+		bool get( obj_t & out ) {
+			for ( ;; ) {
+				m_canRead.wait_for(-1);
+				mutexScope guard(m_mutex);
+				auto i = m_list.begin();
+				if ( i == m_list.end() ) {
+					if (m_eof) return false;
+					continue;
+				}
+				out = std::move(*i);
+				m_list.erase( i );
+				didGet();
+				return true;
+			}
+		}
+
+		bool get( obj_t & out, pfc::eventHandle_t hAbort, bool * didAbort = nullptr ) {
+			if (didAbort != nullptr) * didAbort = false;
+			for ( ;; ) {
+				if (pfc::event::g_twoEventWait( hAbort, m_canRead.get_handle(), -1) == 1) {
+					if (didAbort != nullptr) * didAbort = true;
+					return false;
+				}
+				mutexScope guard(m_mutex);
+				auto i = m_list.begin();
+				if ( i == m_list.end() ) {
+					if (m_eof) return false;
+					continue;
+				}
+				out = std::move(*i);
+				m_list.erase( i );
+				didGet();
+				return true;
+			}
+		}
+		void clear() {
+			mutexScope guard(m_mutex);
+			m_eof = false;
+			m_list.clear();
+			m_canRead.set_state(false);
+		}
+	private:
+		void didGet() {
+			if (!m_eof && m_list.size() == 0) m_canRead.set_state(false);
+		}
+		bool m_eof;
+		std::list<obj_t> m_list;
+		mutex m_mutex;
+		event m_canRead;
+	};
+
+	template<typename obj_t_>
+	class waitQueue2 {
+	protected:
+		typedef obj_t_ obj_t;
+		typedef std::list<obj_t_> list_t;
+		virtual bool canWriteCheck(list_t const &) { return true; }
+
+	public:
+		void waitWrite() {
+			m_canWrite.wait_for(-1);
+		}
+		bool waitWrite(pfc::eventHandle_t hAbort) {
+			return event::g_twoEventWait( hAbort, m_canWrite.get_handle(), -1) == 2;
+		}
+		eventHandle_t waitWriteHandle() {
+			return m_canWrite.get_handle();
+		}
+
+		waitQueue2() {
+			m_canWrite.set_state(true);
+		}
+
+		template<typename arg_t>
+		void put(arg_t && obj) {
+			mutexScope guard(m_mutex);
+			m_list.push_back(std::forward<arg_t>(obj));
+			if (m_list.size() == 1) m_canRead.set_state(true);
+			refreshCanWrite();
+		}
+
+		void set_eof() {
+			mutexScope guard(m_mutex);
+			m_eof = true;
+			m_canRead.set_state(true);
+			m_canWrite.set_state(false);
+		}
+
+		bool get(obj_t & out) {
+			for (;; ) {
+				m_canRead.wait_for(-1);
+				mutexScope guard(m_mutex);
+				auto i = m_list.begin();
+				if (i == m_list.end()) {
+					if (m_eof) return false;
+					continue;
+				}
+				out = std::move(*i);
+				m_list.erase(i);
+				didGet();
+				return true;
+			}
+		}
+
+		typedef std::function<bool(obj_t&)> receive_peek_t;
+		// Block until there's something to return + return multiple objects at once.
+		// Use peek function (optional) to stop reading / leave remaining items for the next call to pick up.
+		std::list<obj_t> receive(pfc::eventHandle_t hAbort, receive_peek_t peek = nullptr , bool* didAbort = nullptr) {
+			if (didAbort != nullptr) *didAbort = false;
+			std::list<obj_t> ret;
+			for (bool retry = false; ; retry = true ) {
+				// try without wait first, only place where this is really used can poll abort before/after without system calls
+				if (retry && pfc::event::g_twoEventWait(hAbort, m_canRead.get_handle(), -1) == 1) {
+					if (didAbort != nullptr) *didAbort = true;
+					break;
+				}
+				mutexScope guard(m_mutex);
+				auto i = m_list.begin();
+				if (i == m_list.end()) {
+					if (m_eof) break;
+					continue;
+				}
+				bool bDidGet = false;
+				do {
+					if (peek && !peek(*i)) break;
+					auto n = i; ++n;
+					ret.splice(ret.end(), m_list, i);
+					i = std::move(n);
+					bDidGet = true;
+				} while (i != m_list.end());
+				if ( bDidGet ) didGet();
+				break;
+			}
+			return ret;
+		}
+
+		bool get(obj_t & out, pfc::eventHandle_t hAbort, bool * didAbort = nullptr) {
+			if (didAbort != nullptr) * didAbort = false;
+			for (;; ) {
+				if (pfc::event::g_twoEventWait(hAbort, m_canRead.get_handle(), -1) == 1) {
+					if (didAbort != nullptr) * didAbort = true;
+					return false;
+				}
+				mutexScope guard(m_mutex);
+				auto i = m_list.begin();
+				if (i == m_list.end()) {
+					if (m_eof) return false;
+					continue;
+				}
+				out = std::move(*i);
+				m_list.erase(i);
+				didGet();
+				return true;
+			}
+		}
+
+		void clear() {
+			mutexScope guard(m_mutex);
+			m_list.clear();
+			m_eof = false;
+			m_canRead.set_state(false);
+			m_canWrite.set_state(true);
+		}
+	private:
+		void didGet() {
+            // mutex assumed locked
+			if (m_eof) return;
+			if (m_list.empty()) {
+				m_canRead.set_state(false);
+				m_canWrite.set_state(true);
+			} else {
+				m_canWrite.set_state(canWriteCheck(m_list));
+			}
+		}
+		void refreshCanWrite() {
+            // mutex assumed locked
+			m_canWrite.set_state( !m_eof && canWriteCheck(m_list));
+		}
+		bool m_eof = false;
+		std::list<obj_t> m_list;
+		mutex m_mutex;
+		event m_canRead, m_canWrite;
+	};
+}