summaryrefslogtreecommitdiffstats
path: root/winsup/cygserver/threaded_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'winsup/cygserver/threaded_queue.cc')
-rw-r--r--winsup/cygserver/threaded_queue.cc249
1 files changed, 249 insertions, 0 deletions
diff --git a/winsup/cygserver/threaded_queue.cc b/winsup/cygserver/threaded_queue.cc
new file mode 100644
index 000000000..5fb22b191
--- /dev/null
+++ b/winsup/cygserver/threaded_queue.cc
@@ -0,0 +1,249 @@
+/* threaded_queue.cc
+
+ Copyright 2001 Red Hat Inc.
+
+ Written by Robert Collins <rbtcollins@hotmail.com>
+
+ This file is part of Cygwin.
+
+ This software is a copyrighted work licensed under the terms of the
+ Cygwin license. Please consult the file "CYGWIN_LICENSE" for
+ details. */
+
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <windows.h>
+#include <sys/types.h>
+#include "wincap.h"
+#include "threaded_queue.h"
+#define DEBUG 1
+#define debug_printf if (DEBUG) printf
+
+/* threaded_queue */
+
+DWORD WINAPI
+worker_function (LPVOID LpParam)
+{
+ class threaded_queue *queue = (class threaded_queue *) LpParam;
+ class queue_request *request;
+ /* FIXME use a threadsafe pop instead for speed? */
+ while (queue->active)
+ {
+ EnterCriticalSection (&queue->queuelock);
+ while (!queue->request && queue->active)
+ {
+ LeaveCriticalSection (&queue->queuelock);
+ DWORD rc = WaitForSingleObject (queue->event, INFINITE);
+ if (rc == WAIT_FAILED)
+ {
+ printf ("Wait for event failed\n");
+ queue->running--;
+ ExitThread (0);
+ }
+ EnterCriticalSection (&queue->queuelock);
+ }
+ if (!queue->active)
+ {
+ queue->running--;
+ LeaveCriticalSection (&queue->queuelock);
+ ExitThread (0);
+ }
+ /* not needed, but it is efficient */
+ request =
+ (class queue_request *) InterlockedExchangePointer (&queue->request,
+ queue->request->
+ next);
+ LeaveCriticalSection (&queue->queuelock);
+ request->process ();
+ delete request;
+ }
+ queue->running--;
+ ExitThread (0);
+}
+
+void
+threaded_queue::create_workers ()
+{
+ InitializeCriticalSection (&queuelock);
+ if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ {
+ printf ("Failed to create event queue (%lu), terminating\n",
+ GetLastError ());
+ exit (1);
+ }
+ active = true;
+
+ /* FIXME: Use a stack pair and create threads on the fly whenever
+ * we have to to service a request.
+ */
+ for (unsigned int i = 0; i < initial_workers; i++)
+ {
+ HANDLE hThread;
+ DWORD tid;
+ hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
+ if (hThread == NULL)
+ {
+ printf ("Failed to create thread (%lu), terminating\n",
+ GetLastError ());
+ exit (1);
+ }
+ CloseHandle (hThread);
+ running++;
+ }
+}
+
+void
+threaded_queue::cleanup ()
+{
+ /* harvest the threads */
+ active = false;
+ /* kill the request processing loops */
+ queue_process_param *reqloop;
+ /* make sure we don't race with a incoming request creation */
+ EnterCriticalSection (&queuelock);
+ reqloop =
+ (queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
+ while (reqloop)
+ {
+ queue_process_param *t = reqloop;
+ reqloop = reqloop->next;
+ delete t;
+ }
+ LeaveCriticalSection (&queuelock);
+ if (!running)
+ return;
+ printf ("Waiting for current queue threads to terminate\n");
+ for (int n = running; n; n--)
+ PulseEvent (event);
+ while (running)
+ sleep (1);
+ DeleteCriticalSection (&queuelock);
+ CloseHandle (event);
+}
+
+/* FIXME: return success or failure */
+void
+threaded_queue::add (queue_request * therequest)
+{
+ /* safe to not "Try" because workers don't hog this, they wait on the event
+ */
+ EnterCriticalSection (&queuelock);
+ if (!running)
+ {
+ printf ("No worker threads to handle request!\n");
+ }
+ if (!request)
+ request = therequest;
+ else
+ {
+ /* add to the queue end. */
+ queue_request *listrequest = request;
+ while (listrequest->next)
+ listrequest = listrequest->next;
+ listrequest->next = therequest;
+ }
+ PulseEvent (event);
+ LeaveCriticalSection (&queuelock);
+}
+
+/* FIXME: return success or failure rather than quitting */
+void
+threaded_queue::process_requests (queue_process_param * params,
+ threaded_queue_thread_function *
+ request_loop)
+{
+ if (params->start (request_loop, this) == false)
+ exit (1);
+ params->next =
+ (queue_process_param *) InterlockedExchangePointer (&process_head,
+ params);
+}
+
+/* queue_process_param */
+/* How does a constructor return an error? */
+queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
+interruptible
+(ninterruptible)
+{
+ if (!interruptible)
+ return;
+ debug_printf ("creating an interruptible processing thread\n");
+ if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
+ {
+ printf ("Failed to create interrupt event (%lu), terminating\n",
+ GetLastError ());
+ exit (1);
+ }
+}
+
+queue_process_param::~queue_process_param ()
+{
+ if (running)
+ stop ();
+ if (!interruptible)
+ return;
+ CloseHandle (interrupt);
+}
+
+bool
+ queue_process_param::start (threaded_queue_thread_function * request_loop,
+ threaded_queue * thequeue)
+{
+ queue = thequeue;
+ hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
+ if (hThread)
+ {
+ running = true;
+ return true;
+ }
+ printf ("Failed to create thread (%lu), terminating\n", GetLastError ());
+ return false;
+}
+
+void
+queue_process_param::stop ()
+{
+ if (interruptible)
+ {
+ InterlockedExchange (&shutdown, true);
+ PulseEvent (interrupt);
+ /* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
+ * scheduled again, we print an error and exit. We _should_ loop or
+ * try resignalling. We don't want to hand here though...
+ */
+ int n = 5;
+ while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
+ if (!n)
+ {
+ printf ("Process thread didn't shutdown cleanly after 200ms!\n");
+ exit (1);
+ }
+ else
+ running = false;
+ }
+ else
+ {
+ printf ("killing request loop thread %ld\n", tid);
+ int rc;
+ if (!(rc = TerminateThread (hThread, 0)))
+ {
+ printf ("error shutting down request loop worker thread\n");
+ }
+ running = false;
+ }
+ CloseHandle (hThread);
+}
+
+/* queue_request */
+queue_request::queue_request ():next (NULL)
+{
+}
+
+void
+queue_request::process (void)
+{
+ printf ("\n**********************************************\n"
+ "Oh no! we've hit the base queue_request process() function, and this indicates a coding\n"
+ "fault !!!\n" "***********************************************\n");
+}