Files
CN2021/httplib/parser.py
2021-03-28 19:53:14 +02:00

242 lines
6.6 KiB
Python

import logging
import os
import pathlib
import re
import urllib
from datetime import datetime
from time import mktime
from typing import Dict
from urllib.parse import urlparse, urlsplit
from wsgiref.handlers import format_date_time
from httplib.exceptions import InvalidStatusLine, InvalidResponse, BadRequest, InvalidRequestLine
from httplib.httpsocket import FORMAT
def _is_valid_http_version(http_version: str):
"""
Returns True if the specified HTTP-version is valid.
@param http_version: the string to be checked
@return: True if the specified HTTP-version is valid.
"""
if len(http_version) < 8 or http_version[4] != "/":
return False
(name, version) = http_version[:4], http_version[5:]
if name != "HTTP" or not re.match(r"1\.[0|1]", version):
return False
return True
def parse_status_line(line: str):
"""
Parses the specified line as an HTTP status-line.
@param line: the status-line to be parsed
@raise InvalidStatusLine: if the line couldn't be parsed, if the HTTP-version is invalid or if the status code
is invalid
@return: tuple of the HTTP-version, status and reason
"""
split = list(filter(None, line.strip().split(" ", 2)))
if len(split) < 3:
raise InvalidStatusLine(line)
http_version, status, reason = split
if not _is_valid_http_version(http_version):
raise InvalidStatusLine(line)
version = http_version[:4]
if not re.match(r"\d{3}", status):
raise InvalidStatusLine(line)
status = int(status)
if status < 100 or status > 999:
raise InvalidStatusLine(line)
return version, status, reason
def parse_request_line(line: str):
"""
Parses the specified line as an HTTP request-line.
Returns the method, target as ParseResult and HTTP version from the request-line.
@param line: the request-line to be parsed
@raise InvalidRequestLine: if the line couldn't be parsed.
@raise BadRequest: Invalid HTTP method, Invalid HTTP-version or Invalid target
@return: tuple of the method, target and HTTP-version
"""
split = list(filter(None, line.rstrip().split(" ", 2)))
if len(split) < 3:
raise InvalidRequestLine(line, "missing argument in request-line")
method, target, version = split
if method not in ("CONNECT", "DELETE", "GET", "HEAD", "OPTIONS", "POST", "PUT", "TRACE"):
raise BadRequest(f"Invalid method: {method}")
if not _is_valid_http_version(version):
logging.debug("[ABRT] request: invalid http-version=%r", version)
raise BadRequest(f"Invalid HTTP-version: {version}")
if len(target) == "":
raise BadRequest("request-target not specified")
parsed_target = urlsplit(target)
return method, parsed_target, version.split("/")[1]
def parse_headers(lines):
"""
Parses the lines from the `lines` iterator as headers.
@param lines: iterator to retrieve the lines from.
@return: A dictionary with header as key and value as value.
"""
headers = []
try:
# first header after the start-line may not start with a space
line = next(lines)
while True:
if line[0].isspace():
continue
else:
break
while True:
if line in ("\r\n", "\r", "\n", ""):
break
if line[0].isspace():
headers[-1] = headers[-1].rstrip("\r\n")
headers.append(line.lstrip())
line = next(lines)
except StopIteration:
# No more lines to be parsed
pass
result = {}
header_str = "".join(headers)
for line in header_str.splitlines():
pos = line.find(":")
if pos <= 0 or pos >= len(line) - 1:
continue
(header, value) = map(str.strip, line.split(":", 1))
check_next_header(result, header, value)
result[header.lower()] = value.lower()
return result
def check_next_header(headers, next_header: str, next_value: str):
if next_header == "content-length":
if "content-length" in headers:
raise InvalidResponse("Multiple content-length headers specified")
if not next_value.isnumeric() or int(next_value) <= 0:
raise InvalidResponse(f"Invalid content-length value: {next_value}")
def parse_uri(uri: str):
"""
Parse the specified URI into the host, port and path.
If the URI is invalid, this method will try to create one.
@param uri: the URI to be parsed
@return: A tuple with the host, port and path
"""
parsed = urlsplit(uri)
# If there is no hostname, the given string is not a valid URI, so split on /
if parsed.hostname:
host = parsed.hostname
path = parsed.path
if parsed.query != '':
path = f"{path}?{parsed.query}"
elif "/" in uri:
(host, path) = uri.split("/", 1)
else:
host = uri
path = "/"
if ":" in host:
host, port = host.split(":", 1)
elif parsed.scheme == "https":
port = 443
else:
port = 80
return host, port, path
def uri_from_url(url: str):
"""
Returns a valid URI of the specified URL.
"""
parsed = urlsplit(url)
if parsed.hostname is None:
url = f"http://{url}"
parsed = urlsplit(url)
path = parsed.path
if path == "":
path = "/"
result = f"http://{parsed.netloc}{path}"
if parsed.query != "":
result = f"{result}?{parsed.query}"
return result
def urljoin(base, url):
"""
Join a base url, and a URL to form an absolute url.
"""
return urllib.parse.urljoin(base, url)
def get_charset(headers: Dict[str, str]):
"""
Returns the charset of the content from the headers if found. Otherwise, returns `FORMAT`
@param headers: the headers to retrieve the charset from
@return: A charset
"""
if "content-type" in headers:
content_type = headers["content-type"]
match = re.search(r"charset\s*=\s*([a-z\-0-9]*)", content_type, re.I)
if match:
return match.group(1)
return FORMAT
def get_relative_save_path(path: str):
"""
Returns the specified path relative to the working directory.
@param path: the path to compute
@return: the relative path
"""
path_obj = pathlib.PurePath(path)
root = pathlib.PurePath(os.getcwd())
rel = path_obj.relative_to(root)
return str(rel)
def get_date():
"""
Returns a string representation of the current date according to RFC 1123.
"""
now = datetime.now()
stamp = mktime(now.timetuple())
return format_date_time(stamp)