This commit is contained in:
2021-03-22 02:41:49 +01:00
parent d25d2ef993
commit 42f1661e0a
10 changed files with 172 additions and 54 deletions

View File

@@ -4,23 +4,22 @@ import multiprocessing as mp
import threading
from concurrent.futures import ThreadPoolExecutor
from logging import Logger
from socket import socket
import socket
from server.RequestHandler import RequestHandler
THREAD_LIMIT = 20
THREAD_LIMIT = 128
def worker(address, name, log_level, queue: mp.Queue, stop_event: mp.Event):
logging.basicConfig(level=log_level)
logger = multiprocessing.log_to_stderr(level=log_level)
runner = Worker(address, name, logger, queue, stop_event)
runner.logger.debug("Worker %s started", name)
def worker(address, name, logging_level, queue: mp.Queue, stop_event: mp.Event):
logging.basicConfig(level=logging_level, format="%(levelname)s:[WORKER " + str(name) + "] %(message)s")
runner = Worker(address, name, queue, stop_event)
logging.debug("started")
try:
runner.run()
except KeyboardInterrupt:
logger.debug("Ctrl+C pressed, terminating")
logging.debug("Ctrl+C pressed, terminating")
runner.shutdown()
@@ -35,10 +34,9 @@ class Worker:
finished_queue: mp.Queue
def __init__(self, host, name, logger, queue: mp.Queue, stop_event: mp.Event):
def __init__(self, host, name, queue: mp.Queue, stop_event: mp.Event):
self.host = host
self.name = name
self.logger = logger
self.queue = queue
self.executor = ThreadPoolExecutor(THREAD_LIMIT)
self.stop_event = stop_event
@@ -58,26 +56,27 @@ class Worker:
if conn is None or addr is None:
break
self.logger.debug("Received new client: %s", addr)
logging.debug("Processing new client: %s", addr)
# submit client to thread
print(threading.get_ident())
self.executor.submit(self._handle_client, conn, addr)
self.shutdown()
def _handle_client(self, conn: socket, addr):
def _handle_client(self, conn: socket.socket, addr):
try:
self.logger.debug("Handling client: %s", addr)
logging.debug("Handling client: %s", addr)
handler = RequestHandler(conn, self.logger, self.host)
handler = RequestHandler(conn, self.host)
handler.listen()
except Exception as e:
self.logger.debug("Internal error", exc_info=e)
logging.debug("Internal error")
conn.shutdown(socket.SHUT_RDWR)
conn.close()
# Finished, put back into queue
self.finished_queue.put(threading.get_ident())
def shutdown(self):
self.logger.info("shutting down")
logging.info("shutting down")
self.executor.shutdown()