diff options
Diffstat (limited to 'winsup/cygwin/threaded_queue.cc')
-rwxr-xr-x | winsup/cygwin/threaded_queue.cc | 500 |
1 files changed, 329 insertions, 171 deletions
diff --git a/winsup/cygwin/threaded_queue.cc b/winsup/cygwin/threaded_queue.cc index 321fa1612..ba0fe4178 100755 --- a/winsup/cygwin/threaded_queue.cc +++ b/winsup/cygwin/threaded_queue.cc @@ -4,247 +4,405 @@ Written by Robert Collins <rbtcollins@hotmail.com> - This file is part of Cygwin. +This file is part of Cygwin. - This software is a copyrighted work licensed under the terms of the - Cygwin license. Please consult the file "CYGWIN_LICENSE" for - details. */ +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ +#include "woutsup.h" + +#include <assert.h> #include <errno.h> #include <stdio.h> #include <unistd.h> -#include <windows.h> #include <sys/types.h> #include <stdlib.h> -#include "wincap.h" #include "threaded_queue.h" -#define DEBUG 1 -#define debug_printf if (DEBUG) printf + +/*****************************************************************************/ + +/* queue_request */ + +queue_request::~queue_request () +{} + +/*****************************************************************************/ /* threaded_queue */ -DWORD WINAPI -worker_function (LPVOID LpParam) +threaded_queue::threaded_queue (const size_t initial_workers) + : _workers_count (0), + _running (false), + _submitters_head (NULL), + _requests_count (0), + _requests_head (NULL), + _requests_sem (NULL) { - class threaded_queue *queue = (class threaded_queue *) LpParam; - class queue_request *request; - /* FIXME use a threadsafe pop instead for speed? */ - while (queue->active) + InitializeCriticalSection (&_queue_lock); + + // This semaphore's count is the number of requests on the queue. + // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS + // multiplied by max. threads per process (2028?), which is (a few) + // more requests than could ever be pending with the current design. + + _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES + 0, // Initial count + 129792, // Maximum count + NULL); // Anonymous + + if (!_requests_sem) { - EnterCriticalSection (&queue->queuelock); - while (!queue->request && queue->active) - { - LeaveCriticalSection (&queue->queuelock); - DWORD rc = WaitForSingleObject (queue->event, INFINITE); - if (rc == WAIT_FAILED) - { - printf ("Wait for event failed\n"); - queue->running--; - ExitThread (0); - } - EnterCriticalSection (&queue->queuelock); - } - if (!queue->active) - { - queue->running--; - LeaveCriticalSection (&queue->queuelock); - ExitThread (0); - } - /* not needed, but it is efficient */ - request = - (class queue_request *) InterlockedExchangePointer (&queue->request, - queue->request-> - next); - LeaveCriticalSection (&queue->queuelock); - request->process (); - delete request; + system_printf (("failed to create the request queue semaphore, " + "error = %lu"), + GetLastError ()); + abort (); } - queue->running--; - ExitThread (0); + + create_workers (initial_workers); } -void -threaded_queue::create_workers () +threaded_queue::~threaded_queue () { - InitializeCriticalSection (&queuelock); - if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + if (_running) + stop (); + + debug_printf ("deleting all pending queue requests"); + queue_request *reqptr = _requests_head; + while (reqptr) { - printf ("Failed to create event queue (%lu), terminating\n", - GetLastError ()); - exit (1); + queue_request *const ptr = reqptr; + reqptr = reqptr->_next; + safe_delete (ptr); } - active = true; - /* FIXME: Use a stack pair and create threads on the fly whenever - * we have to to service a request. - */ - for (unsigned int i = 0; i < initial_workers; i++) + DeleteCriticalSection (&_queue_lock); + if (_requests_sem) + (void) CloseHandle (_requests_sem); +} + +/* FIXME: return success or failure rather than quitting */ +void +threaded_queue::add_submission_loop (queue_submission_loop *const submitter) +{ + assert (this); + assert (submitter); + assert (submitter->_queue == this); + assert (!submitter->_next); + + submitter->_next = + TInterlockedExchangePointer (&_submitters_head, submitter); + + if (_running) + submitter->start (); +} + +bool +threaded_queue::start () +{ + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = true; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (!was_running) { - HANDLE hThread; - DWORD tid; - hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); - if (hThread == NULL) + debug_printf ("starting all queue submission loops"); + + while (loopptr) { - printf ("Failed to create thread (%lu), terminating\n", - GetLastError ()); - exit (1); + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->start (); } - CloseHandle (hThread); - running++; } + + return was_running; } -void -threaded_queue::cleanup () +bool +threaded_queue::stop () { - /* harvest the threads */ - active = false; - /* kill the request processing loops */ - queue_process_param *reqloop; - /* make sure we don't race with a incoming request creation */ - EnterCriticalSection (&queuelock); - reqloop = - (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); - while (reqloop) + EnterCriticalSection (&_queue_lock); + const bool was_running = _running; + _running = false; + queue_submission_loop *loopptr = _submitters_head; + LeaveCriticalSection (&_queue_lock); + + if (was_running) { - queue_process_param *t = reqloop; - reqloop = reqloop->next; - delete t; + debug_printf ("stopping all queue submission loops"); + while (loopptr) + { + queue_submission_loop *const ptr = loopptr; + loopptr = loopptr->_next; + ptr->stop (); + } + + ReleaseSemaphore (_requests_sem, _workers_count, NULL); + while (_workers_count) + { + debug_printf (("waiting for worker threads to terminate: " + "%lu still running"), + _workers_count); + Sleep (1000); + } + debug_printf ("all worker threads have terminated"); } - LeaveCriticalSection (&queuelock); - if (!running) - return; - printf ("Waiting for current queue threads to terminate\n"); - for (int n = running; n; n--) - PulseEvent (event); - while (running) - sleep (1); - DeleteCriticalSection (&queuelock); - CloseHandle (event); + + return was_running; } /* FIXME: return success or failure */ void -threaded_queue::add (queue_request * therequest) +threaded_queue::add (queue_request *const therequest) { - /* safe to not "Try" because workers don't hog this, they wait on the event - */ - EnterCriticalSection (&queuelock); - if (!running) + assert (this); + assert (therequest); + assert (!therequest->_next); + + if (!_workers_count) { - printf ("No worker threads to handle request!\n"); + system_printf ("warning: no worker threads to handle request!"); + // FIXME: And then what? } - if (!request) - request = therequest; + + EnterCriticalSection (&_queue_lock); + if (!_requests_head) + _requests_head = therequest; else { - /* add to the queue end. */ - queue_request *listrequest = request; - while (listrequest->next) - listrequest = listrequest->next; - listrequest->next = therequest; + /* Add to the queue end. */ + queue_request *reqptr = _requests_head; + for (; reqptr->_next; reqptr = reqptr->_next) + {} + assert (reqptr); + assert (!reqptr->_next); + reqptr->_next = therequest; } - PulseEvent (event); - LeaveCriticalSection (&queuelock); + + _requests_count += 1; + assert (_requests_count > 0); + LeaveCriticalSection (&_queue_lock); + + (void) ReleaseSemaphore (_requests_sem, 1, NULL); } -/* FIXME: return success or failure rather than quitting */ +/*static*/ DWORD WINAPI +threaded_queue::start_routine (const LPVOID lpParam) +{ + class threaded_queue *const queue = (class threaded_queue *) lpParam; + assert (queue); + + queue->worker_loop (); + + const long count = InterlockedDecrement (&queue->_workers_count); + assert (count >= 0); + + if (queue->_running) + debug_printf ("worker loop has exited; thread about to terminate"); + + return 0; +} + +/* Called from the constructor: so no need to be thread-safe until the + * worker threads start to be created; thus the interlocked increment + * of the `_workers_count' field. + */ + void -threaded_queue::process_requests (queue_process_param * params, - threaded_queue_thread_function * - request_loop) +threaded_queue::create_workers (const size_t initial_workers) { - if (params->start (request_loop, this) == false) - exit (1); - params->next = - (queue_process_param *) InterlockedExchangePointer (&process_head, - params); + assert (initial_workers > 0); + + for (unsigned int i = 0; i != initial_workers; i++) + { + const long count = InterlockedIncrement (&_workers_count); + assert (count > 0); + + DWORD tid; + const HANDLE hThread = + CreateThread (NULL, 0, start_routine, this, 0, &tid); + + if (!hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } + + (void) CloseHandle (hThread); + } } -/* queue_process_param */ -/* How does a constructor return an error? */ -queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), -interruptible -(ninterruptible) +void +threaded_queue::worker_loop () { - if (!interruptible) - return; - debug_printf ("creating an interruptible processing thread\n"); - if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + while (true) { - printf ("Failed to create interrupt event (%lu), terminating\n", - GetLastError ()); - exit (1); + const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE); + if (rc == WAIT_FAILED) + { + system_printf ("wait for request semaphore failed, error = %lu", + GetLastError ()); + return; + } + assert (rc == WAIT_OBJECT_0); + + EnterCriticalSection (&_queue_lock); + if (!_running) + { + LeaveCriticalSection (&_queue_lock); + return; + } + + assert (_requests_head); + queue_request *const reqptr = _requests_head; + _requests_head = reqptr->_next; + + _requests_count -= 1; + assert (_requests_count >= 0); + LeaveCriticalSection (&_queue_lock); + + assert (reqptr); + reqptr->process (); + safe_delete (reqptr); + } +} + +/*****************************************************************************/ + +/* queue_submission_loop */ + +queue_submission_loop::queue_submission_loop (threaded_queue *const queue, + const bool ninterruptible) + : _running (false), + _interrupt_event (NULL), + _queue (queue), + _interruptible (ninterruptible), + _hThread (NULL), + _tid (0), + _next (NULL) +{ + if (_interruptible) + { + // verbose: debug_printf ("creating an interruptible processing thread"); + + _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES + FALSE, // Auto-reset + FALSE, // Initially non-signalled + NULL); // Anonymous + + if (!_interrupt_event) + { + system_printf ("failed to create interrupt event, error = %lu", + GetLastError ()); + abort (); + } } } -queue_process_param::~queue_process_param () +queue_submission_loop::~queue_submission_loop () { - if (running) + if (_running) stop (); - if (!interruptible) - return; - CloseHandle (interrupt); + if (_interrupt_event) + (void) CloseHandle (_interrupt_event); + if (_hThread) + (void) CloseHandle (_hThread); } bool - queue_process_param::start (threaded_queue_thread_function * request_loop, - threaded_queue * thequeue) +queue_submission_loop::start () { - queue = thequeue; - hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); - if (hThread) + assert (this); + assert (!_hThread); + + const bool was_running = _running; + + if (!was_running) { - running = true; - return true; + _running = true; + + _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid); + if (!_hThread) + { + system_printf ("failed to create thread, error = %lu", + GetLastError ()); + abort (); + } } - printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); - return false; + + return was_running; } -void -queue_process_param::stop () +bool +queue_submission_loop::stop () { - if (interruptible) + assert (this); + assert (_hThread && _hThread != INVALID_HANDLE_VALUE); + + const bool was_running = _running; + + if (_running) { - InterlockedExchange (&shutdown, true); - PulseEvent (interrupt); - /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get - * scheduled again, we print an error and exit. We _should_ loop or - * try resignalling. We don't want to hand here though... - */ - int n = 5; - while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); - if (!n) + _running = false; + + if (_interruptible) { - printf ("Process thread didn't shutdown cleanly after 200ms!\n"); - exit (1); + assert (_interrupt_event + && _interrupt_event != INVALID_HANDLE_VALUE); + + SetEvent (_interrupt_event); + + if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT) + { + system_printf (("request loop thread %lu failed to shutdown " + "when asked politely: about to get heavy"), + _tid); + + if (!TerminateThread (_hThread, 0)) + { + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); + abort (); + } + } } else - running = false; - } - else - { - printf ("killing request loop thread %ld\n", tid); - int rc; - if (!(rc = TerminateThread (hThread, 0))) { - printf ("error shutting down request loop worker thread\n"); + // FIXME: could wait to see if the request loop notices that + // the submission loop is no longer running and shuts down + // voluntarily. + + debug_printf ("killing request loop thread %lu", _tid); + + if (!TerminateThread (_hThread, 0)) + system_printf (("failed to kill request loop thread %lu" + ", error = %lu"), + _tid, GetLastError ()); } - running = false; } - CloseHandle (hThread); -} -/* queue_request */ -queue_request::queue_request ():next (NULL) -{ + return was_running; } -void -queue_request::process (void) +/*static*/ DWORD WINAPI +queue_submission_loop::start_routine (const LPVOID lpParam) { - printf ("\n**********************************************\n" - "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n" - "fault !!!\n" "***********************************************\n"); + class queue_submission_loop *const submission_loop = + (class queue_submission_loop *) lpParam; + assert (submission_loop); + + submission_loop->request_loop (); + + debug_printf ("submission loop has exited; thread about to terminate"); + + submission_loop->stop (); + + return 0; } + +/*****************************************************************************/ |