Files
CN2021/server/worker.py
2021-03-28 18:55:00 +02:00

163 lines
5.4 KiB
Python

import logging
import multiprocessing as mp
import socket
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Dict
from httplib.exceptions import HTTPServerException, InternalServerError, HTTPServerCloseException
from server.requesthandler import RequestHandler
THREAD_LIMIT = 128
def worker(address, name, logging_level, queue: mp.Queue, stop_event: mp.Event):
logging.basicConfig(level=logging_level, format="%(levelname)s:[WORKER " + str(name) + "] %(message)s")
runner = Worker(address, name, queue, stop_event)
logging.debug("started")
try:
runner.run()
except KeyboardInterrupt:
# Catch exit signals and close the threads appropriately.
logging.debug("Ctrl+C pressed, terminating")
runner.shutdown()
class Worker:
"""
A Worker instance represents a parallel execution process to handle incoming connections.
Worker instances are created when the HTTP server starts. They are used to handle many incoming connections
asynchronously.
"""
host: str
name: str
queue: mp.Queue
executor: ThreadPoolExecutor
stop_event: mp.Event
finished_queue: mp.Queue
dispatched_sockets: Dict[int, socket.socket]
def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event):
"""
Create a new Worker instance
@param host: The hostname of the HTTP server
@param name: The name of this Worker instance
@param queue: The dispatch queue for incoming socket connections
@param stop_event: The Event that signals when to shut down this worker.
"""
self.host = host
self.name = name
self.queue = queue
self.executor = ThreadPoolExecutor(THREAD_LIMIT)
self.stop_event = stop_event
self.finished_queue = mp.Queue()
self.dispatched_sockets = {}
for i in range(THREAD_LIMIT):
self.finished_queue.put(i)
def run(self):
"""
Run this worker.
The worker will start waiting for incoming clients being added to the queue and submit them to
the executor.
"""
while not self.stop_event.is_set():
# Blocks until the thread is free
self.finished_queue.get()
# Blocks until a new client connects
conn, addr = self.queue.get()
if conn is None or addr is None:
break
logging.debug("Processing new client: %s", addr)
# submit the client to the executor
self.executor.submit(self._handle_client, conn, addr)
self.shutdown()
def _handle_client(self, conn: socket.socket, addr):
"""
Target method for the worker threads.
Creates a RequestHandler and handles any exceptions which may occur.
@param conn: The client socket
@param addr: The address of the client.
"""
self.dispatched_sockets[threading.get_ident()] = conn
try:
self.__do_handle_client(conn, addr)
except Exception:
if not self.stop_event:
logging.debug("Internal error in thread:", exc_info=True)
self.dispatched_sockets.pop(threading.get_ident())
# Finished, put back into queue
self.finished_queue.put(threading.get_ident())
def __do_handle_client(self, conn: socket.socket, addr):
handler = RequestHandler(conn, self.host)
while True:
try:
handler.listen()
except HTTPServerCloseException as e:
# Exception raised after which the client should be disconnected.
logging.warning("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg)
RequestHandler.send_error(conn, e.status_code, e.message)
break
except HTTPServerException as e:
# Normal HTTP exception raised (e.a. 404) continue listening.
logging.debug("[HTTP: %s] %s. Reason: %s", e.status_code, e.message, e.arg)
RequestHandler.send_error(conn, e.status_code, e.message)
except socket.timeout:
# socket timed out, disconnect.
logging.info("Socket for client %s timed out.", addr)
break
except ConnectionAbortedError:
# Client aborted connection
logging.info("Socket for client %s disconnected.", addr)
break
except Exception as e:
# Unexpected exception raised. Send 500 and disconnect.
logging.error("Internal error", exc_info=e)
RequestHandler.send_error(conn, InternalServerError.status_code, InternalServerError.message)
break
conn.shutdown(socket.SHUT_RDWR)
conn.close()
def shutdown(self):
logging.info("shutting down")
# shutdown executor, but do not wait
self.executor.shutdown(False)
logging.info("Closing sockets")
# Copy dictionary to prevent issues with concurrency
clients = self.dispatched_sockets.copy().values()
for client in clients:
client: socket.socket
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except OSError:
# Ignore exception due to already closed sockets
pass
# Call shutdown again and wait this time
self.executor.shutdown()