diff options
Diffstat (limited to 'winsup/cygserver/threaded_queue.cc')
-rw-r--r-- | winsup/cygserver/threaded_queue.cc | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc new file mode 100644 index 000000000..5fb22b191 --- /dev/null +++ b/winsup/cygserver/threaded_queue.cc @@ -0,0 +1,249 @@ +/* threaded_queue.cc + + Copyright 2001 Red Hat Inc. + + Written by Robert Collins <rbtcollins@hotmail.com> + + This file is part of Cygwin. + + This software is a copyrighted work licensed under the terms of the + Cygwin license. Please consult the file "CYGWIN_LICENSE" for + details. */ + +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <windows.h> +#include <sys/types.h> +#include "wincap.h" +#include "threaded_queue.h" +#define DEBUG 1 +#define debug_printf if (DEBUG) printf + +/* threaded_queue */ + +DWORD WINAPI +worker_function (LPVOID LpParam) +{ + class threaded_queue *queue = (class threaded_queue *) LpParam; + class queue_request *request; + /* FIXME use a threadsafe pop instead for speed? */ + while (queue->active) + { + EnterCriticalSection (&queue->queuelock); + while (!queue->request && queue->active) + { + LeaveCriticalSection (&queue->queuelock); + DWORD rc = WaitForSingleObject (queue->event, INFINITE); + if (rc == WAIT_FAILED) + { + printf ("Wait for event failed\n"); + queue->running--; + ExitThread (0); + } + EnterCriticalSection (&queue->queuelock); + } + if (!queue->active) + { + queue->running--; + LeaveCriticalSection (&queue->queuelock); + ExitThread (0); + } + /* not needed, but it is efficient */ + request = + (class queue_request *) InterlockedExchangePointer (&queue->request, + queue->request-> + next); + LeaveCriticalSection (&queue->queuelock); + request->process (); + delete request; + } + queue->running--; + ExitThread (0); +} + +void +threaded_queue::create_workers () +{ + InitializeCriticalSection (&queuelock); + if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + { + printf ("Failed to create event queue (%lu), terminating\n", + GetLastError ()); + exit (1); + } + active = true; + + /* FIXME: Use a stack pair and create threads on the fly whenever + * we have to to service a request. + */ + for (unsigned int i = 0; i < initial_workers; i++) + { + HANDLE hThread; + DWORD tid; + hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid); + if (hThread == NULL) + { + printf ("Failed to create thread (%lu), terminating\n", + GetLastError ()); + exit (1); + } + CloseHandle (hThread); + running++; + } +} + +void +threaded_queue::cleanup () +{ + /* harvest the threads */ + active = false; + /* kill the request processing loops */ + queue_process_param *reqloop; + /* make sure we don't race with a incoming request creation */ + EnterCriticalSection (&queuelock); + reqloop = + (queue_process_param *) InterlockedExchangePointer (&process_head, NULL); + while (reqloop) + { + queue_process_param *t = reqloop; + reqloop = reqloop->next; + delete t; + } + LeaveCriticalSection (&queuelock); + if (!running) + return; + printf ("Waiting for current queue threads to terminate\n"); + for (int n = running; n; n--) + PulseEvent (event); + while (running) + sleep (1); + DeleteCriticalSection (&queuelock); + CloseHandle (event); +} + +/* FIXME: return success or failure */ +void +threaded_queue::add (queue_request * therequest) +{ + /* safe to not "Try" because workers don't hog this, they wait on the event + */ + EnterCriticalSection (&queuelock); + if (!running) + { + printf ("No worker threads to handle request!\n"); + } + if (!request) + request = therequest; + else + { + /* add to the queue end. */ + queue_request *listrequest = request; + while (listrequest->next) + listrequest = listrequest->next; + listrequest->next = therequest; + } + PulseEvent (event); + LeaveCriticalSection (&queuelock); +} + +/* FIXME: return success or failure rather than quitting */ +void +threaded_queue::process_requests (queue_process_param * params, + threaded_queue_thread_function * + request_loop) +{ + if (params->start (request_loop, this) == false) + exit (1); + params->next = + (queue_process_param *) InterlockedExchangePointer (&process_head, + params); +} + +/* queue_process_param */ +/* How does a constructor return an error? */ +queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false), +interruptible +(ninterruptible) +{ + if (!interruptible) + return; + debug_printf ("creating an interruptible processing thread\n"); + if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL) + { + printf ("Failed to create interrupt event (%lu), terminating\n", + GetLastError ()); + exit (1); + } +} + +queue_process_param::~queue_process_param () +{ + if (running) + stop (); + if (!interruptible) + return; + CloseHandle (interrupt); +} + +bool + queue_process_param::start (threaded_queue_thread_function * request_loop, + threaded_queue * thequeue) +{ + queue = thequeue; + hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid); + if (hThread) + { + running = true; + return true; + } + printf ("Failed to create thread (%lu), terminating\n", GetLastError ()); + return false; +} + +void +queue_process_param::stop () +{ + if (interruptible) + { + InterlockedExchange (&shutdown, true); + PulseEvent (interrupt); + /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get + * scheduled again, we print an error and exit. We _should_ loop or + * try resignalling. We don't want to hand here though... + */ + int n = 5; + while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT); + if (!n) + { + printf ("Process thread didn't shutdown cleanly after 200ms!\n"); + exit (1); + } + else + running = false; + } + else + { + printf ("killing request loop thread %ld\n", tid); + int rc; + if (!(rc = TerminateThread (hThread, 0))) + { + printf ("error shutting down request loop worker thread\n"); + } + running = false; + } + CloseHandle (hThread); +} + +/* queue_request */ +queue_request::queue_request ():next (NULL) +{ +} + +void +queue_request::process (void) +{ + printf ("\n**********************************************\n" + "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n" + "fault !!!\n" "***********************************************\n"); +} |