import logging import os import pathlib import re import urllib from typing import Dict from urllib.parse import urlparse, urlsplit from httplib.exceptions import InvalidStatusLine, InvalidResponse, BadRequest, InvalidRequestLine from httplib.httpsocket import FORMAT def _is_valid_http_version(http_version: str): """ Returns True if the specified HTTP-version is valid. @param http_version: the string to be checked @return: True if the specified HTTP-version is valid. """ if len(http_version) < 8 or http_version[4] != "/": return False (name, version) = http_version[:4], http_version[5:] if name != "HTTP" or not re.match(r"1\.[0|1]", version): return False return True def parse_status_line(line: str): """ Parses the specified line as an HTTP status-line. @param line: the status-line to be parsed @raise InvalidStatusLine: if the line couldn't be parsed, if the HTTP-version is invalid or if the status code is invalid @return: tuple of the HTTP-version, status and reason """ split = list(filter(None, line.strip().split(" ", 2))) if len(split) < 3: raise InvalidStatusLine(line) http_version, status, reason = split if not _is_valid_http_version(http_version): raise InvalidStatusLine(line) version = http_version[:4] if not re.match(r"\d{3}", status): raise InvalidStatusLine(line) status = int(status) if status < 100 or status > 999: raise InvalidStatusLine(line) return version, status, reason def parse_request_line(line: str): """ Parses the specified line as and HTTP request-line. Returns the method, target as ParseResult and HTTP version from the request-line. @param line: the request-line to be parsed @raise InvalidRequestLine: if the line couldn't be parsed. @raise BadRequest: Invalid HTTP method, Invalid HTTP-version or Invalid target @return: tuple of the method, target and HTTP-version """ split = list(filter(None, line.rstrip().split(" ", 2))) if len(split) < 3: raise InvalidRequestLine(line) method, target, version = split if method not in ("CONNECT", "DELETE", "GET", "HEAD", "OPTIONS", "POST", "PUT", "TRACE"): raise BadRequest(f"Invalid method: {method}") if not _is_valid_http_version(version): logging.debug("[ABRT] request: invalid http-version=%r", version) raise BadRequest(f"Invalid HTTP-version: {version}") if len(target) == "": raise BadRequest("request-target not specified") parsed_target = urlsplit(target) return method, parsed_target, version.split("/")[1] def parse_headers(lines): """ Parses the lines from the `lines` iterator as headers. @param lines: iterator to retrieve the lines from. @return: A dictionary with header as key and value as value. """ headers = [] try: # first header after the start-line may not start with a space line = next(lines) while True: if line[0].isspace(): continue else: break while True: if line in ("\r\n", "\n", ""): break if line[0].isspace(): headers[-1] = headers[-1].rstrip("\r\n") headers.append(line.lstrip()) line = next(lines) except StopIteration: # No more lines to be parsed pass result = {} header_str = "".join(headers) for line in header_str.splitlines(): pos = line.find(":") if pos <= 0 or pos >= len(line) - 1: continue (header, value) = map(str.strip, line.split(":", 1)) check_next_header(result, header, value) result[header.lower()] = value.lower() return result def check_next_header(headers, next_header: str, next_value: str): if next_header == "content-length": if "content-length" in headers: raise InvalidResponse("Multiple content-length headers specified") if not next_value.isnumeric() or int(next_value) <= 0: raise InvalidResponse(f"Invalid content-length value: {next_value}") def parse_uri(uri: str): """ Parse the specified URI into the host, port and path. If the URI is invalid, this method will try to create one. @param uri: the URI to be parsed @return: A tuple with the host, port and path """ parsed = urlsplit(uri) # If there is no hostname, the given string is not a valid URI, so split on / if parsed.hostname: host = parsed.hostname path = parsed.path if parsed.query != '': path = f"{path}?{parsed.query}" elif "/" in uri: (host, path) = uri.split("/", 1) else: host = uri path = "/" if ":" in host: host, port = host.split(":", 1) elif parsed.scheme == "https": port = 443 else: port = 80 return host, port, path def get_uri(url: str): """ Returns a valid URI of the specified URL. """ parsed = urlsplit(url) result = f"http://{parsed.netloc}{parsed.path}" if parsed.query != "": result = f"{result}?{parsed.query}" return result def urljoin(base, url): """ Join a base url and a URL to form an absolute url. """ return urllib.parse.urljoin(base, url) def get_charset(headers: Dict[str, str]): """ Returns the charset of the content from the headers if found. Otherwise returns `FORMAT` @param headers: the headers to retrieve the charset from @return: A charset """ if "content-type" in headers: content_type = headers["content-type"] match = re.search(r"charset\s*=\s*([a-z\-0-9]*)", content_type, re.I) if match: return match.group(1) return FORMAT def get_relative_save_path(path: str): """ Returns the specified path relative to the working directory. @param path: the path to compute @return: the relative path """ path_obj = pathlib.PurePath(path) root = pathlib.PurePath(os.getcwd()) rel = path_obj.relative_to(root) return str(rel)