From 0f2b039e71b74256c0a3c6becf241750993a91ba Mon Sep 17 00:00:00 2001 From: Arthur Bols Date: Sun, 28 Mar 2021 17:12:07 +0200 Subject: [PATCH] Fix issues with shutdown, improve documentation --- server/command.py | 3 ++ server/requesthandler.py | 5 ++- server/worker.py | 70 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/server/command.py b/server/command.py index bc39f83..18926e8 100644 --- a/server/command.py +++ b/server/command.py @@ -66,6 +66,9 @@ class AbstractCommand(ABC): def _build_message(self, status: int, content_type: str, body: bytes, extra_headers=None): + if extra_headers is None: + extra_headers = {} + self._process_conditional_headers() message = f"HTTP/1.1 {status} {status_message[status]}\r\n" diff --git a/server/requesthandler.py b/server/requesthandler.py index 7fb40fb..6079e39 100644 --- a/server/requesthandler.py +++ b/server/requesthandler.py @@ -17,7 +17,10 @@ METHODS = ("GET", "HEAD", "PUT", "POST") class RequestHandler: """ - Processes incoming HTTP request messages. + A RequestHandler instance processes incoming HTTP requests messages from a single client. + + RequestHandler instances are created everytime a client connects. They will read the incoming + messages, parse, verify them and send a respond. """ conn: HTTPSocket diff --git a/server/worker.py b/server/worker.py index 28e37ca..e4440f9 100644 --- a/server/worker.py +++ b/server/worker.py @@ -3,6 +3,7 @@ import multiprocessing as mp import socket import threading from concurrent.futures import ThreadPoolExecutor +from typing import Dict from httplib.exceptions import HTTPServerException, InternalServerError, HTTPServerCloseException from server.requesthandler import RequestHandler @@ -18,11 +19,19 @@ def worker(address, name, logging_level, queue: mp.Queue, stop_event: mp.Event): try: runner.run() except KeyboardInterrupt: + # Catch exit signals and close the threads appropriately. logging.debug("Ctrl+C pressed, terminating") runner.shutdown() class Worker: + """ + A Worker instance represents a parallel execution process to handle incoming connections. + + Worker instances are created when the HTTP server starts. They are used to handle many incoming connections + asynchronously. + """ + host: str name: str queue: mp.Queue @@ -30,19 +39,35 @@ class Worker: stop_event: mp.Event finished_queue: mp.Queue + dispatched_sockets: Dict[int, socket.socket] def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event): + """ + Create a new Worker instance + + @param host: The hostname of the HTTP server + @param name: The name of this Worker instance + @param queue: The dispatch queue for incoming socket connections + @param stop_event: The Event that signals when to shutdown this worker. + """ self.host = host self.name = name self.queue = queue self.executor = ThreadPoolExecutor(THREAD_LIMIT) self.stop_event = stop_event self.finished_queue = mp.Queue() + self.dispatched_sockets = {} for i in range(THREAD_LIMIT): self.finished_queue.put(i) def run(self): + """ + Run this worker. + + The worker will start waiting for incoming clients being added to the queue and submit them to + the executor. + """ while not self.stop_event.is_set(): # Blocks until thread is free @@ -61,6 +86,26 @@ class Worker: self.shutdown() def _handle_client(self, conn: socket.socket, addr): + """ + Target method for the worker threads. + Creates a RequestHandler and handles any exceptions which may occur. + + @param conn: The client socket + @param addr: The address of the client. + """ + + self.dispatched_sockets[threading.get_ident()] = conn + try: + self.__do_handle_client(conn, addr) + except Exception: + if not self.stop_event: + logging.debug("Internal error in thread:", exc_info=True) + + self.dispatched_sockets.pop(threading.get_ident()) + # Finished, put back into queue + self.finished_queue.put(threading.get_ident()) + + def __do_handle_client(self, conn: socket.socket, addr): handler = RequestHandler(conn, self.host) @@ -68,29 +113,48 @@ class Worker: try: handler.listen() except HTTPServerCloseException as e: + # Exception raised after which the client should be disconnected. logging.warning("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg) RequestHandler.send_error(conn, e.status_code, e.message) + break except HTTPServerException as e: + # Normal HTTP exception raised (e.a. 404) continue listening. logging.debug("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg) RequestHandler.send_error(conn, e.status_code, e.message) except socket.timeout: + # socket timed out, disconnect. logging.info("Socket for client %s timed out.", addr) break except ConnectionAbortedError: + # Client aborted connection logging.info("Socket for client %s disconnected.", addr) break except Exception as e: + # Unexpected exception raised. Send 500 and disconnect. logging.error("Internal error", exc_info=e) RequestHandler.send_error(conn, InternalServerError.status_code, InternalServerError.message) break - logging.info("Closing socket for client %s", addr) conn.shutdown(socket.SHUT_RDWR) conn.close() - # Finished, put back into queue - self.finished_queue.put(threading.get_ident()) def shutdown(self): logging.info("shutting down") + # shutdown executor, but do not wait + self.executor.shutdown(False) + + logging.info("Closing sockets") + # Copy dictionary to prevent issues with concurrency + clients = self.dispatched_sockets.copy().values() + for client in clients: + client: socket.socket + try: + client.shutdown(socket.SHUT_RDWR) + client.close() + except OSError: + # Ignore exception due to already closed sockets + pass + + # Call shutdown again and wait this time self.executor.shutdown()