From 0ffdc73a6d8033b2cbee4c5dc398eb1d48506ce3 Mon Sep 17 00:00:00 2001 From: Arthur Bols Date: Sun, 28 Mar 2021 01:09:11 +0100 Subject: [PATCH] server: use queue blocking instead of sleep --- server/httpserver.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/server/httpserver.py b/server/httpserver.py index 161cceb..16780e2 100644 --- a/server/httpserver.py +++ b/server/httpserver.py @@ -1,7 +1,6 @@ import logging import multiprocessing as mp import socket -import time from multiprocessing.context import Process from multiprocessing.queues import Queue from multiprocessing.synchronize import Event @@ -10,9 +9,6 @@ from server import worker class HTTPServer: - """ - - """ address: str port: int workers = [] @@ -24,11 +20,11 @@ class HTTPServer: def __init__(self, address: str, port: int, worker_count, logging_level): """ - Initialize a HTTP server with the specified address, port, worker_count and logging_level + Initialize an HTTP server with the specified address, port, worker_count and logging_level @param address: the address to listen on for connections @param port: the port to listen on for connections - @param worker_count: - @param logging_level: + @param worker_count: The amount of worker processes to create + @param logging_level: verbosity level for the logger """ self.address = address self.port = port @@ -36,7 +32,9 @@ class HTTPServer: self.logging_level = logging_level mp.set_start_method("spawn") - self._dispatch_queue = mp.Queue() + + # Create a queue with maximum size of worker_count. + self._dispatch_queue = mp.Queue(worker_count) self._stop_event = mp.Event() def start(self): @@ -51,12 +49,9 @@ class HTTPServer: def __do_start(self): """ Internal method to start the server. - - @raise: """ # Create socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.address, self.port)) # Create workers processes to handle requests @@ -75,14 +70,12 @@ class HTTPServer: logging.debug("Listening on %s:%d", self.address, self.port) while True: - if self._dispatch_queue.qsize() > self.worker_count: - time.sleep(0.01) - continue - conn, addr = self.server.accept() conn.settimeout(5) logging.info("New connection: %s", addr[0]) + + # blocks when the queue is full (contains self.worker_count items). self._dispatch_queue.put((conn, addr)) logging.debug("Dispatched connection %s", addr) @@ -98,7 +91,7 @@ class HTTPServer: # Wake up workers logging.debug("Waking up workers") - for p in self.workers: + for _ in self.workers: self._dispatch_queue.put((None, None)) logging.debug("Closing dispatch queue") @@ -120,7 +113,6 @@ class HTTPServer: A worker process is created with start method "spawn", target `worker.worker` and the `self.logging_level` is passed along with the `self.dispatch_queue` and `self._stop_event` - """ for i in range(self.worker_count): logging.debug("Creating worker: %d", i + 1) @@ -129,6 +121,3 @@ class HTTPServer: self._stop_event)) p.start() self.workers.append(p) - - time.sleep(0.2) - time.sleep(1)