import logging import multiprocessing as mp import socket import threading from concurrent.futures import ThreadPoolExecutor from httplib.exceptions import HTTPServerException, InternalServerError, HTTPServerCloseException from server.requesthandler import RequestHandler THREAD_LIMIT = 128 def worker(address, name, logging_level, queue: mp.Queue, stop_event: mp.Event): logging.basicConfig(level=logging_level, format="%(levelname)s:[WORKER " + str(name) + "] %(message)s") runner = Worker(address, name, queue, stop_event) logging.debug("started") try: runner.run() except KeyboardInterrupt: logging.debug("Ctrl+C pressed, terminating") runner.shutdown() class Worker: host: str name: str queue: mp.Queue executor: ThreadPoolExecutor stop_event: mp.Event finished_queue: mp.Queue def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event): self.host = host self.name = name self.queue = queue self.executor = ThreadPoolExecutor(THREAD_LIMIT) self.stop_event = stop_event self.finished_queue = mp.Queue() for i in range(THREAD_LIMIT): self.finished_queue.put(i) def run(self): while not self.stop_event.is_set(): # Blocks until thread is free self.finished_queue.get() # Blocks until new client connects conn, addr = self.queue.get() if conn is None or addr is None: break logging.debug("Processing new client: %s", addr) # submit client to thread self.executor.submit(self._handle_client, conn, addr) self.shutdown() def _handle_client(self, conn: socket.socket, addr): while True: try: handler = RequestHandler(conn, self.host) handler.listen() except HTTPServerCloseException as e: logging.debug("HTTP Exception:", exc_info=e) RequestHandler.send_error(conn, e.status_code, e.message) break except HTTPServerException as e: logging.debug("HTTP Exception:", exc_info=e) RequestHandler.send_error(conn, e.status_code, e.message) except socket.timeout: logging.debug("Socket for client %s timed out", addr) break except Exception as e: logging.debug("Internal error", exc_info=e) RequestHandler.send_error(conn, InternalServerError.status_code, InternalServerError.message) break conn.shutdown(socket.SHUT_RDWR) conn.close() # Finished, put back into queue self.finished_queue.put(threading.get_ident()) def shutdown(self): logging.info("shutting down") self.executor.shutdown()