This commit is contained in:
2021-03-27 16:30:53 +01:00
parent fdbd865889
commit 3615c56152
14 changed files with 280 additions and 110 deletions

View File

@@ -10,6 +10,9 @@ from server import worker
class HTTPServer:
"""
"""
address: str
port: int
workers = []
@@ -20,6 +23,13 @@ class HTTPServer:
_stop_event: Event
def __init__(self, address: str, port: int, worker_count, logging_level):
"""
Initialize a HTTP server with the specified address, port, worker_count and logging_level
@param address: the address to listen on for connections
@param port: the port to listen on for connections
@param worker_count:
@param logging_level:
"""
self.address = address
self.port = port
self.worker_count = worker_count
@@ -30,24 +40,39 @@ class HTTPServer:
self._stop_event = mp.Event()
def start(self):
"""
Start the HTTP server.
"""
try:
self.__do_start()
except KeyboardInterrupt:
self.__shutdown()
def __do_start(self):
"""
Internal method to start the server.
@raise:
"""
# Create socket
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.address, self.port))
# Create workers processes to handle requests
self.__create_workers()
self.__listen()
def __listen(self):
"""
Start listening for new connections
If a connection is received, it will be dispatched to the worker queue, and picked up by a worker process.
"""
self.server.listen()
logging.debug("Listening for connections")
logging.debug("Listening on %s:%d", self.address, self.port)
while True:
if self._dispatch_queue.qsize() > self.worker_count:
@@ -62,6 +87,11 @@ class HTTPServer:
logging.debug("Dispatched connection %s", addr)
def __shutdown(self):
"""
Cleanly shutdown the server
Notifies the worker processes to shutdown and eventually closes the server socket
"""
# Set stop event
self._stop_event.set()
@@ -85,10 +115,18 @@ class HTTPServer:
self.server.close()
def __create_workers(self):
"""
Create worker processes up to `self.worker_count`.
A worker process is created with start method "spawn", target `worker.worker` and the `self.logging_level`
is passed along with the `self.dispatch_queue` and `self._stop_event`
"""
for i in range(self.worker_count):
logging.debug("Creating worker: %d", i + 1)
p = mp.Process(target=worker.worker,
args=(f"{self.address}:{self.port}", i + 1, self.logging_level, self._dispatch_queue, self._stop_event))
args=(f"{self.address}:{self.port}", i + 1, self.logging_level, self._dispatch_queue,
self._stop_event))
p.start()
self.workers.append(p)