import logging import multiprocessing as mp import socket from multiprocessing.context import Process from multiprocessing.queues import Queue from multiprocessing.synchronize import Event from server import worker class HTTPServer: address: str port: int workers = [] worker_count: int server: socket _dispatch_queue: Queue _stop_event: Event def __init__(self, address: str, port: int, worker_count, logging_level): """ Initialize an HTTP server with the specified address, port, worker_count and logging_level @param address: the address to listen on for connections @param port: the port to listen on for connections @param worker_count: The amount of worker processes to create @param logging_level: verbosity level for the logger """ self.address = address self.port = port self.worker_count = worker_count self.logging_level = logging_level mp.set_start_method("spawn") # Create a queue with maximum size of worker_count. self._dispatch_queue = mp.Queue(worker_count) self._stop_event = mp.Event() def start(self): """ Start the HTTP server. """ try: self.__do_start() except KeyboardInterrupt: self.__shutdown() def __do_start(self): """ Internal method to start the server. """ # Create socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.address, self.port)) # Create workers processes to handle requests self.__create_workers() self.__listen() def __listen(self): """ Start listening for new connections If a connection is received, it will be dispatched to the worker queue, and picked up by a worker process. """ self.server.listen() logging.debug("Listening on %s:%d", self.address, self.port) while True: conn, addr = self.server.accept() conn.settimeout(5) logging.info("New connection: %s", addr[0]) # blocks when the queue is full (contains self.worker_count items). self._dispatch_queue.put((conn, addr)) logging.debug("Dispatched connection %s", addr) def __shutdown(self): """ Cleanly shutdown the server Notifies the worker processes to shutdown and eventually closes the server socket """ # Set stop event self._stop_event.set() # Wake up workers logging.debug("Waking up workers") for _ in self.workers: self._dispatch_queue.put((None, None)) logging.debug("Closing dispatch queue") self._dispatch_queue.close() logging.debug("Waiting for workers to shutdown") p: Process for p in self.workers: p.join() p.terminate() logging.debug("Shutting down socket") self.server.shutdown(socket.SHUT_RDWR) self.server.close() def __create_workers(self): """ Create worker processes up to `self.worker_count`. A worker process is created with start method "spawn", target `worker.worker` and the `self.logging_level` is passed along with the `self.dispatch_queue` and `self._stop_event` """ for i in range(self.worker_count): logging.debug("Creating worker: %d", i + 1) p = mp.Process(target=worker.worker, args=(f"{self.address}:{self.port}", i + 1, self.logging_level, self._dispatch_queue, self._stop_event)) p.start() self.workers.append(p)