diff options
Diffstat (limited to 'winsup/cygwin/threaded_queue.h')
-rwxr-xr-x | winsup/cygwin/threaded_queue.h | 140 |
1 files changed, 100 insertions, 40 deletions
diff --git a/winsup/cygwin/threaded_queue.h b/winsup/cygwin/threaded_queue.h index d72541e22..5b6fddc42 100755 --- a/winsup/cygwin/threaded_queue.h +++ b/winsup/cygwin/threaded_queue.h @@ -4,64 +4,124 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ #ifndef _THREADED_QUEUE_ #define _THREADED_QUEUE_ +/*****************************************************************************/ + /* a specific request */ class queue_request { - public: - class queue_request *next; - virtual void process (); - queue_request(); +public: + queue_request *_next; + + queue_request () : _next (NULL) {} + virtual ~queue_request (); + + virtual void process () = 0; }; +/*****************************************************************************/ -typedef DWORD WINAPI threaded_queue_thread_function (LPVOID); -/* parameters for a request finding and submitting loop */ +/* a queue to allocate requests from n submission loops to x worker threads */ -class queue_process_param +class queue_submission_loop; + +class threaded_queue { - public: - bool start (threaded_queue_thread_function *, class threaded_queue *); - void stop (); - bool running; - long int shutdown; - class queue_process_param * next; - class threaded_queue *queue; - queue_process_param (bool ninterruptible); - ~queue_process_param (); - bool interruptible; - HANDLE interrupt; - HANDLE hThread; - DWORD tid; +public: + threaded_queue (size_t initial_workers = 1); + ~threaded_queue (); + + void add_submission_loop (queue_submission_loop *); + + bool running () const { return _running; } + + bool start (); + bool stop (); + + void add (queue_request *); + +private: + long _workers_count; + bool _running; + + queue_submission_loop *_submitters_head; + + long _requests_count; // Informational only. + queue_request *_requests_head; + + CRITICAL_SECTION _queue_lock; + HANDLE _requests_sem; // == _requests_count + + static DWORD WINAPI start_routine (LPVOID /* this */); + + void create_workers (size_t initial_workers); + void worker_loop (); }; -/* a queue to allocate requests from n submission loops to x worker threads */ +/*****************************************************************************/ -class threaded_queue +/* parameters for a request finding and submitting loop */ + +class queue_submission_loop { - public: - CRITICAL_SECTION queuelock; - HANDLE event; - bool active; - queue_request * request; - unsigned int initial_workers; - unsigned int running; - void create_workers (); - void cleanup (); - void add (queue_request *); - void process_requests (queue_process_param *, threaded_queue_thread_function *); - threaded_queue () : active (false), request (NULL), initial_workers (1), running (0), process_head (NULL) {}; - private: - queue_request *process_head; + friend class threaded_queue; + +public: + queue_submission_loop (threaded_queue *, bool ninterruptible); + virtual ~queue_submission_loop (); + + bool start (); + bool stop (); + + threaded_queue *queue () { return _queue; }; + +protected: + bool _running; + HANDLE _interrupt_event; + threaded_queue *const _queue; + +private: + bool _interruptible; + HANDLE _hThread; + DWORD _tid; + queue_submission_loop *_next; + + static DWORD WINAPI start_routine (LPVOID /* this */); + virtual void request_loop () = 0; }; +#ifdef __cplusplus + +/*---------------------------------------------------------------------------* + * Some type-safe versions of the various interlocked functions. + *---------------------------------------------------------------------------*/ + +template <typename T> T * +TInterlockedExchangePointer (T **lvalue, T *rvalue) +{ + return reinterpret_cast<T *> + (InterlockedExchangePointer (reinterpret_cast<void **> (lvalue), + reinterpret_cast<void *> (rvalue))); +} + +template <typename T> T * +TInterlockedCompareExchangePointer (T **lvalue, T *rvalue1, T *rvalue2) +{ + return reinterpret_cast<T *> + (InterlockedCompareExchangePointer (reinterpret_cast<void **> (lvalue), + reinterpret_cast<void *> (rvalue1), + reinterpret_cast<void *> (rvalue2))); +} + +#endif /* __cplusplus */ + #endif /* _THREADED_QUEUE_ */ |