blob: 8a8543058e242e3e1d598f37e216cfea4fabebf0 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
/* threaded_queue.h
Written by Robert Collins <rbtcollins@hotmail.com>
This file is part of Cygwin.
This software is a copyrighted work licensed under the terms of the
Cygwin license. Please consult the file "CYGWIN_LICENSE" for
details. */
#ifndef _THREADED_QUEUE_
#define _THREADED_QUEUE_
/*****************************************************************************/
/* a specific request */
class queue_request
{
public:
queue_request *_next;
queue_request () : _next (NULL) {}
virtual ~queue_request ();
virtual void process () = 0;
};
/*****************************************************************************/
/* a queue to allocate requests from n submission loops to x worker threads */
class queue_submission_loop;
class threaded_queue
{
public:
threaded_queue (size_t initial_workers = 1);
~threaded_queue ();
void add_submission_loop (queue_submission_loop *);
bool running () const { return _running; }
bool start ();
bool stop ();
void add (queue_request *);
private:
LONG _workers_count;
bool _running;
queue_submission_loop *_submitters_head;
long _requests_count; // Informational only.
queue_request *_requests_head;
CRITICAL_SECTION _queue_lock;
HANDLE _requests_sem; // == _requests_count
static DWORD WINAPI start_routine (LPVOID /* this */);
void create_workers (size_t initial_workers);
void worker_loop ();
};
/*****************************************************************************/
/* parameters for a request finding and submitting loop */
class queue_submission_loop
{
friend class threaded_queue;
public:
queue_submission_loop (threaded_queue *, bool ninterruptible);
virtual ~queue_submission_loop ();
bool start ();
bool stop ();
threaded_queue *queue () { return _queue; };
protected:
bool _running;
HANDLE _interrupt_event;
threaded_queue *const _queue;
private:
bool _interruptible;
HANDLE _hThread;
DWORD _tid;
queue_submission_loop *_next;
static DWORD WINAPI start_routine (LPVOID /* this */);
virtual void request_loop () = 0;
};
#ifdef __cplusplus
/*---------------------------------------------------------------------------*
* Some type-safe versions of the various interlocked functions.
*---------------------------------------------------------------------------*/
template <typename T> T *
TInterlockedExchangePointer (T **lvalue, T *rvalue)
{
return reinterpret_cast<T *>
(InterlockedExchangePointer (reinterpret_cast<void **> (lvalue),
reinterpret_cast<void *> (rvalue)));
}
template <typename T> T *
TInterlockedCompareExchangePointer (T **lvalue, T *rvalue1, T *rvalue2)
{
return reinterpret_cast<T *>
(InterlockedCompareExchangePointer (reinterpret_cast<void **> (lvalue),
reinterpret_cast<void *> (rvalue1),
reinterpret_cast<void *> (rvalue2)));
}
#endif /* __cplusplus */
#endif /* _THREADED_QUEUE_ */
|