|
1
|
1 #pragma once
|
|
|
2 #include <pfc/wait_queue.h>
|
|
|
3 #include <pfc/pool.h>
|
|
|
4 #include <pfc/threads.h>
|
|
|
5 #include <functional>
|
|
|
6 #include "rethrow.h"
|
|
|
7 #include <pfc/timers.h>
|
|
|
8
|
|
|
9 namespace ThreadUtils {
|
|
|
10
|
|
|
11 typedef std::function<bool(pfc::eventHandle_t, double) > waitFunc_t;
|
|
|
12
|
|
|
13 // Serialize access to some resource to a single thread
|
|
|
14 // Execute blocking/nonabortable methods in with proper abortability (detach on abort and move on)
|
|
|
15 class cmdThread {
|
|
|
16 public:
|
|
|
17 typedef std::function<void () > func_t;
|
|
|
18 typedef pfc::waitQueue<func_t> queue_t;
|
|
|
19 typedef std::function<void (abort_callback&) > funcAbortable_t;
|
|
|
20
|
|
|
21 protected:
|
|
|
22 std::function<void () > makeWorker(waitFunc_t wf = pfc::event::g_wait_for) {
|
|
|
23 auto q = m_queue;
|
|
|
24 auto x = m_atExit;
|
|
|
25 return [q, x, wf] {
|
|
|
26 for ( ;; ) {
|
|
|
27 func_t f;
|
|
|
28 wf(q->get_event_handle(), -1);
|
|
|
29 if (!q->get(f)) break;
|
|
|
30 try { f(); } catch(...) {}
|
|
|
31 }
|
|
|
32 // No guard for atExit access, as nobody is supposed to be still able to call host object methods by the time we get here
|
|
|
33 for( auto i = x->begin(); i != x->end(); ++ i ) {
|
|
|
34 auto f = *i;
|
|
|
35 try { f(); } catch(...) {}
|
|
|
36 }
|
|
|
37 };
|
|
|
38 };
|
|
|
39 std::function<void () > makeWorker2( std::function<void()> updater, double interval, waitFunc_t wf = pfc::event::g_wait_for) {
|
|
|
40 auto q = m_queue;
|
|
|
41 auto x = m_atExit;
|
|
|
42 return [=] {
|
|
|
43 pfc::lores_timer t; t.start();
|
|
|
44 for ( ;; ) {
|
|
|
45 {
|
|
|
46 bool bWorkReady = false;
|
|
|
47 double left = interval - t.query();
|
|
|
48 if ( left > 0 ) {
|
|
|
49 if (wf(q->get_event_handle(), left)) bWorkReady = true;
|
|
|
50 }
|
|
|
51
|
|
|
52 if (!bWorkReady) {
|
|
|
53 updater();
|
|
|
54 t.start();
|
|
|
55 continue;
|
|
|
56 }
|
|
|
57 }
|
|
|
58
|
|
|
59 func_t f;
|
|
|
60 wf(q->get_event_handle(), -1);
|
|
|
61 if (!q->get(f)) break;
|
|
|
62 try { f(); } catch(...) {}
|
|
|
63 }
|
|
|
64 // No guard for atExit access, as nobody is supposed to be still able to call host object methods by the time we get here
|
|
|
65 for( auto i = x->begin(); i != x->end(); ++ i ) {
|
|
|
66 auto f = *i;
|
|
|
67 try { f(); } catch(...) {}
|
|
|
68 }
|
|
|
69 };
|
|
|
70 };
|
|
|
71
|
|
|
72 // For derived classes: create new instance without starting thread, supply thread using by yourself
|
|
|
73 class noCreate {};
|
|
|
74 cmdThread( noCreate ) {}
|
|
|
75 public:
|
|
|
76
|
|
|
77 cmdThread() {
|
|
|
78 pfc::splitThread( makeWorker() );
|
|
|
79 }
|
|
|
80
|
|
|
81 void atExit( func_t f ) {
|
|
|
82 m_atExit->push_back(f);
|
|
|
83 }
|
|
|
84 ~cmdThread() {
|
|
|
85 m_queue->set_eof();
|
|
|
86 }
|
|
|
87 void runSynchronously( func_t f ) { runSynchronously_(f, nullptr); }
|
|
|
88 void runSynchronously_( func_t f, abort_callback * abortOrNull ) {
|
|
|
89 auto evt = m_eventPool.make();
|
|
|
90 evt->set_state(false);
|
|
|
91 auto rethrow = std::make_shared<ThreadUtils::CRethrow>();
|
|
|
92 auto worker2 = [f, rethrow, evt] {
|
|
|
93 rethrow->exec(f);
|
|
|
94 evt->set_state( true );
|
|
|
95 };
|
|
|
96
|
|
|
97 add ( worker2 );
|
|
|
98
|
|
|
99 if ( abortOrNull != nullptr ) {
|
|
|
100 abortOrNull->waitForEvent( * evt, -1 );
|
|
|
101 } else {
|
|
|
102 evt->wait_for(-1);
|
|
|
103 }
|
|
|
104
|
|
|
105 m_eventPool.put( evt );
|
|
|
106
|
|
|
107 rethrow->rethrow();
|
|
|
108 }
|
|
|
109 void runSynchronously( func_t f, abort_callback & abort ) {
|
|
|
110 runSynchronously_(f, &abort);
|
|
|
111 }
|
|
|
112 void runSynchronously2( funcAbortable_t f, abort_callback & abort ) {
|
|
|
113 auto subAbort = m_abortPool.make();
|
|
|
114 subAbort->reset();
|
|
|
115 auto worker = [subAbort, f] {
|
|
|
116 f(*subAbort);
|
|
|
117 };
|
|
|
118
|
|
|
119 try {
|
|
|
120 runSynchronously( worker, abort );
|
|
|
121 } catch(...) {
|
|
|
122 subAbort->set(); throw;
|
|
|
123 }
|
|
|
124
|
|
|
125 m_abortPool.put( subAbort );
|
|
|
126 }
|
|
|
127
|
|
|
128 void add( func_t f ) { m_queue->put( f ); }
|
|
|
129 private:
|
|
|
130 pfc::objPool<pfc::event> m_eventPool;
|
|
|
131 pfc::objPool<abort_callback_impl> m_abortPool;
|
|
|
132 std::shared_ptr<queue_t> m_queue = std::make_shared<queue_t>();
|
|
|
133 typedef std::list<func_t> atExit_t;
|
|
|
134 std::shared_ptr<atExit_t> m_atExit = std::make_shared< atExit_t >();
|
|
|
135 };
|
|
|
136 }
|