Fix issues with shutdown, improve documentation

This commit is contained in:
2021-03-28 17:12:07 +02:00
parent 210c03b73f
commit 0f2b039e71
3 changed files with 74 additions and 4 deletions

View File

@@ -66,6 +66,9 @@ class AbstractCommand(ABC):
def _build_message(self, status: int, content_type: str, body: bytes, extra_headers=None): def _build_message(self, status: int, content_type: str, body: bytes, extra_headers=None):
if extra_headers is None:
extra_headers = {}
self._process_conditional_headers() self._process_conditional_headers()
message = f"HTTP/1.1 {status} {status_message[status]}\r\n" message = f"HTTP/1.1 {status} {status_message[status]}\r\n"

View File

@@ -17,7 +17,10 @@ METHODS = ("GET", "HEAD", "PUT", "POST")
class RequestHandler: class RequestHandler:
""" """
Processes incoming HTTP request messages. A RequestHandler instance processes incoming HTTP requests messages from a single client.
RequestHandler instances are created everytime a client connects. They will read the incoming
messages, parse, verify them and send a respond.
""" """
conn: HTTPSocket conn: HTTPSocket

View File

@@ -3,6 +3,7 @@ import multiprocessing as mp
import socket import socket
import threading import threading
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from typing import Dict
from httplib.exceptions import HTTPServerException, InternalServerError, HTTPServerCloseException from httplib.exceptions import HTTPServerException, InternalServerError, HTTPServerCloseException
from server.requesthandler import RequestHandler from server.requesthandler import RequestHandler
@@ -18,11 +19,19 @@ def worker(address, name, logging_level, queue: mp.Queue, stop_event: mp.Event):
try: try:
runner.run() runner.run()
except KeyboardInterrupt: except KeyboardInterrupt:
# Catch exit signals and close the threads appropriately.
logging.debug("Ctrl+C pressed, terminating") logging.debug("Ctrl+C pressed, terminating")
runner.shutdown() runner.shutdown()
class Worker: class Worker:
"""
A Worker instance represents a parallel execution process to handle incoming connections.
Worker instances are created when the HTTP server starts. They are used to handle many incoming connections
asynchronously.
"""
host: str host: str
name: str name: str
queue: mp.Queue queue: mp.Queue
@@ -30,19 +39,35 @@ class Worker:
stop_event: mp.Event stop_event: mp.Event
finished_queue: mp.Queue finished_queue: mp.Queue
dispatched_sockets: Dict[int, socket.socket]
def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event): def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event):
"""
Create a new Worker instance
@param host: The hostname of the HTTP server
@param name: The name of this Worker instance
@param queue: The dispatch queue for incoming socket connections
@param stop_event: The Event that signals when to shutdown this worker.
"""
self.host = host self.host = host
self.name = name self.name = name
self.queue = queue self.queue = queue
self.executor = ThreadPoolExecutor(THREAD_LIMIT) self.executor = ThreadPoolExecutor(THREAD_LIMIT)
self.stop_event = stop_event self.stop_event = stop_event
self.finished_queue = mp.Queue() self.finished_queue = mp.Queue()
self.dispatched_sockets = {}
for i in range(THREAD_LIMIT): for i in range(THREAD_LIMIT):
self.finished_queue.put(i) self.finished_queue.put(i)
def run(self): def run(self):
"""
Run this worker.
The worker will start waiting for incoming clients being added to the queue and submit them to
the executor.
"""
while not self.stop_event.is_set(): while not self.stop_event.is_set():
# Blocks until thread is free # Blocks until thread is free
@@ -61,6 +86,26 @@ class Worker:
self.shutdown() self.shutdown()
def _handle_client(self, conn: socket.socket, addr): def _handle_client(self, conn: socket.socket, addr):
"""
Target method for the worker threads.
Creates a RequestHandler and handles any exceptions which may occur.
@param conn: The client socket
@param addr: The address of the client.
"""
self.dispatched_sockets[threading.get_ident()] = conn
try:
self.__do_handle_client(conn, addr)
except Exception:
if not self.stop_event:
logging.debug("Internal error in thread:", exc_info=True)
self.dispatched_sockets.pop(threading.get_ident())
# Finished, put back into queue
self.finished_queue.put(threading.get_ident())
def __do_handle_client(self, conn: socket.socket, addr):
handler = RequestHandler(conn, self.host) handler = RequestHandler(conn, self.host)
@@ -68,29 +113,48 @@ class Worker:
try: try:
handler.listen() handler.listen()
except HTTPServerCloseException as e: except HTTPServerCloseException as e:
# Exception raised after which the client should be disconnected.
logging.warning("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg) logging.warning("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg)
RequestHandler.send_error(conn, e.status_code, e.message) RequestHandler.send_error(conn, e.status_code, e.message)
break break
except HTTPServerException as e: except HTTPServerException as e:
# Normal HTTP exception raised (e.a. 404) continue listening.
logging.debug("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg) logging.debug("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg)
RequestHandler.send_error(conn, e.status_code, e.message) RequestHandler.send_error(conn, e.status_code, e.message)
except socket.timeout: except socket.timeout:
# socket timed out, disconnect.
logging.info("Socket for client %s timed out.", addr) logging.info("Socket for client %s timed out.", addr)
break break
except ConnectionAbortedError: except ConnectionAbortedError:
# Client aborted connection
logging.info("Socket for client %s disconnected.", addr) logging.info("Socket for client %s disconnected.", addr)
break break
except Exception as e: except Exception as e:
# Unexpected exception raised. Send 500 and disconnect.
logging.error("Internal error", exc_info=e) logging.error("Internal error", exc_info=e)
RequestHandler.send_error(conn, InternalServerError.status_code, InternalServerError.message) RequestHandler.send_error(conn, InternalServerError.status_code, InternalServerError.message)
break break
logging.info("Closing socket for client %s", addr)
conn.shutdown(socket.SHUT_RDWR) conn.shutdown(socket.SHUT_RDWR)
conn.close() conn.close()
# Finished, put back into queue
self.finished_queue.put(threading.get_ident())
def shutdown(self): def shutdown(self):
logging.info("shutting down") logging.info("shutting down")
# shutdown executor, but do not wait
self.executor.shutdown(False)
logging.info("Closing sockets")
# Copy dictionary to prevent issues with concurrency
clients = self.dispatched_sockets.copy().values()
for client in clients:
client: socket.socket
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except OSError:
# Ignore exception due to already closed sockets
pass
# Call shutdown again and wait this time
self.executor.shutdown() self.executor.shutdown()