import logging import re import socket from typing import Dict BUFSIZE = 4096 TIMEOUT = 3 FORMAT = "UTF-8" class HTTPClient(socket.socket): host: str def __init__(self, host: str): super().__init__(socket.AF_INET, socket.SOCK_STREAM) self.settimeout(TIMEOUT) self.host = host def _do_receive(self): if self.fileno() == -1: raise Exception("Connection closed") result = self.recv(BUFSIZE) return result def receive(self): """Receive data from the client up to BUFSIZE """ count = 0 while True: count += 1 try: return self._do_receive() except socket.timeout: logging.debug("Socket receive timed out after %s seconds", TIMEOUT) if count == 3: break logging.debug("Retrying %s", count) logging.debug("Timed out after waiting %s seconds for response", TIMEOUT * count) raise TimeoutError("Request timed out") def validate_status_line(self, status_line: str): split = list(filter(None, status_line.split(" "))) if len(split) < 3: return False # Check HTTP version http_version = split.pop(0) if len(http_version) < 8 or http_version[4] != "/": raise InvalidStatusLine(status_line) (name, version) = http_version[:4], http_version[5:] if name != "HTTP" or not re.match(r"1\.[0|1]", version): return False if not re.match(r"\d{3}", split[0]): return False return True def get_crlf_chunk(self, buffer: bytes): """Finds the line break type (`CRLF` or `LF`) and splits the specified buffer when encountering 2 consecutive linebreaks. Returns a tuple with the first part and the remaining of the buffer. :param buffer: :return: """ lf_pos = buffer.find(b"\n\n") crlf_pos = buffer.find(b"\r\n\r\n") if lf_pos != -1 and lf_pos < crlf_pos: split_start = lf_pos split_end = lf_pos + 2 else: split_start = crlf_pos split_end = crlf_pos + 4 return buffer[:split_start], buffer[split_end:] def parse_headers(self, data: bytes): headers = {} # decode bytes, split into lines and filter header_split = list( filter(lambda l: l is not "" and not l[0].isspace(), map(str.strip, data.decode("utf-8").split("\n")))) if len(header_split) == 0: raise InvalidResponse(data) start_line = header_split.pop(0) logging.debug("start-line: %r", start_line) for line in header_split: pos = line.find(":") if pos <= 0 or pos >= len(line) - 1: continue (header, value) = map(str.strip, line.split(":", 1)) headers[header.lower()] = value.lower() logging.debug("Parsed headers: %r", headers) return start_line, headers class HTTPException(Exception): """ Base class for HTTP exceptions """ class InvalidResponse(HTTPException): """ Response message cannot be parsed """ def __init(self, message): self.message = message class InvalidStatusLine(HTTPException): """ Response status line is invalid """ def __init(self, line): self.line = line class UnsupportedEncoding(HTTPException): """ Reponse Encoding not support """ def __init(self, enc_type, encoding): self.enc_type = enc_type self.encoding = encoding