import logging import re import urllib from typing import Dict from urllib.parse import urlparse, urlsplit from httplib.exceptions import InvalidStatusLine, InvalidResponse, BadRequest, InvalidRequestLine from httplib.httpsocket import FORMAT def _is_valid_http_version(http_version: str): """ Returns True if the specified HTTP-version is valid. @param http_version: the string to be checked @return: True if the specified HTTP-version is valid. """ if len(http_version) < 8 or http_version[4] != "/": return False (name, version) = http_version[:4], http_version[5:] if name != "HTTP" or not re.match(r"1\.[0|1]", version): return False return True def parse_status_line(line: str): """ Parses the specified line as an HTTP status-line. @param line: the status-line to be parsed @raise InvalidStatusLine: if the line couldn't be parsed, if the HTTP-version is invalid or if the status code is invalid @return: tuple of the HTTP-version, status and reason """ split = list(filter(None, line.strip().split(" ", 2))) if len(split) < 3: raise InvalidStatusLine(line) http_version, status, reason = split if not _is_valid_http_version(http_version): raise InvalidStatusLine(line) version = http_version[:4] if not re.match(r"\d{3}", status): raise InvalidStatusLine(line) status = int(status) if status < 100 or status > 999: raise InvalidStatusLine(line) return version, status, reason def parse_request_line(line: str): """ Parses the specified line as and HTTP request-line. Returns the method, target as ParseResult and HTTP version from the request-line. @param line: the request-line to be parsed @raise InvalidRequestLine: if the line couldn't be parsed. @raise BadRequest: Invalid HTTP method, Invalid HTTP-version or Invalid target @return: tuple of the method, target and HTTP-version """ split = list(filter(None, line.rstrip().split(" ", 2))) if len(split) < 3: raise InvalidRequestLine(line) method, target, version = split if method not in ("CONNECT", "DELETE", "GET", "HEAD", "OPTIONS", "POST", "PUT", "TRACE"): raise BadRequest(f"Invalid method: {method}") if not _is_valid_http_version(version): logging.debug("[ABRT] request: invalid http-version=%r", version) raise BadRequest(f"Invalid HTTP-version: {version}") if len(target) == "": raise BadRequest() parsed_target = urlsplit(target) return method, parsed_target, version.split("/")[1] def parse_headers(lines): headers = [] try: # first header after the start-line may not start with a space line = next(lines) while True: if line[0].isspace(): continue else: break while True: if line in ("\r\n", "\n", ""): break if line[0].isspace(): headers[-1] = headers[-1].rstrip("\r\n") headers.append(line.lstrip()) line = next(lines) except StopIteration: # No more lines to be parsed pass result = {} header_str = "".join(headers) for line in header_str.splitlines(): pos = line.find(":") if pos <= 0 or pos >= len(line) - 1: continue (header, value) = map(str.strip, line.split(":", 1)) check_next_header(result, header, value) result[header.lower()] = value.lower() return result def check_next_header(headers, next_header: str, next_value: str): if next_header == "content-length": if "content-length" in headers: logging.error("Multiple content-length headers specified") raise InvalidResponse() if not next_value.isnumeric() or int(next_value) <= 0: logging.error("Invalid content-length value: %r", next_value) raise InvalidResponse() def parse_uri(uri: str): parsed = urlsplit(uri) # If there is no netloc, the given string is not a valid URI, so split on / if parsed.hostname: host = parsed.hostname path = parsed.path if parsed.query != '': path = f"{path}?{parsed.query}" elif "/" in uri: (host, path) = uri.split("/", 1) else: host = uri path = "/" if ":" in host: host, port = host.split(":", 1) elif parsed.scheme == "https": port = 443 else: port = 80 return host, port, path def get_uri(url: str): """ Returns a valid URI of the specified URL. """ parsed = urlsplit(url) result = f"http://{parsed.netloc}{parsed.path}" if parsed.query != "": result = f"{result}?{parsed.query}" return result def urljoin(base, url): """ Join a base url and a URL to form a absolute url. """ return urllib.parse.urljoin(base, url) def get_charset(headers: Dict[str, str]): if "content-type" in headers: content_type = headers["content-type"] match = re.search(r"charset\s*=\s*([a-z\-0-9]*)", content_type, re.I) if match: return match.group(1) return FORMAT