Files
CN2021/server/httpserver.py

124 lines
3.7 KiB
Python

import logging
import multiprocessing as mp
import socket
from multiprocessing.context import Process
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event
from server import worker
class HTTPServer:
address: str
port: int
workers = []
worker_count: int
server: socket
_dispatch_queue: Queue
_stop_event: Event
def __init__(self, address: str, port: int, worker_count, logging_level):
"""
Initialize an HTTP server with the specified address, port, worker_count and logging_level
@param address: the address to listen on for connections
@param port: the port to listen on for connections
@param worker_count: The amount of worker processes to create
@param logging_level: verbosity level for the logger
"""
self.address = address
self.port = port
self.worker_count = worker_count
self.logging_level = logging_level
mp.set_start_method("spawn")
# Create a queue with maximum size of worker_count.
self._dispatch_queue = mp.Queue(worker_count)
self._stop_event = mp.Event()
def start(self):
"""
Start the HTTP server.
"""
try:
self.__do_start()
except KeyboardInterrupt:
self.__shutdown()
def __do_start(self):
"""
Internal method to start the server.
"""
# Create socket
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((self.address, self.port))
# Create workers processes to handle requests
self.__create_workers()
self.__listen()
def __listen(self):
"""
Start listening for new connections
If a connection is received, it will be dispatched to the worker queue, and picked up by a worker process.
"""
self.server.listen()
logging.debug("Listening on %s:%d", self.address, self.port)
while True:
conn, addr = self.server.accept()
conn.settimeout(5)
logging.info("New connection: %s", addr[0])
# blocks when the queue is full (contains self.worker_count items).
self._dispatch_queue.put((conn, addr))
logging.debug("Dispatched connection %s", addr)
def __shutdown(self):
"""
Cleanly shutdown the server
Notifies the worker processes to shutdown and eventually closes the server socket
"""
# Set stop event
self._stop_event.set()
# Wake up workers
logging.debug("Waking up workers")
for _ in self.workers:
self._dispatch_queue.put((None, None))
logging.debug("Closing dispatch queue")
self._dispatch_queue.close()
logging.debug("Waiting for workers to shutdown")
p: Process
for p in self.workers:
p.join()
p.terminate()
logging.debug("Shutting down socket")
self.server.shutdown(socket.SHUT_RDWR)
self.server.close()
def __create_workers(self):
"""
Create worker processes up to `self.worker_count`.
A worker process is created with start method "spawn", target `worker.worker` and the `self.logging_level`
is passed along with the `self.dispatch_queue` and `self._stop_event`
"""
for i in range(self.worker_count):
logging.debug("Creating worker: %d", i + 1)
p = mp.Process(target=worker.worker,
args=(f"{self.address}:{self.port}", i + 1, self.logging_level, self._dispatch_queue,
self._stop_event))
p.start()
self.workers.append(p)