Files
CN2021/httplib/parser.py

190 lines
5.1 KiB
Python

import logging
import re
import urllib
from typing import Dict
from urllib.parse import urlparse, urlsplit
from httplib.exceptions import InvalidStatusLine, InvalidResponse, BadRequest, InvalidRequestLine
from httplib.httpsocket import FORMAT
def _is_valid_http_version(http_version: str):
"""
Returns True if the specified HTTP-version is valid.
@param http_version: the string to be checked
@return: True if the specified HTTP-version is valid.
"""
if len(http_version) < 8 or http_version[4] != "/":
return False
(name, version) = http_version[:4], http_version[5:]
if name != "HTTP" or not re.match(r"1\.[0|1]", version):
return False
return True
def parse_status_line(line: str):
"""
Parses the specified line as an HTTP status-line.
@param line: the status-line to be parsed
@raise InvalidStatusLine: if the line couldn't be parsed, if the HTTP-version is invalid or if the status code
is invalid
@return: tuple of the HTTP-version, status and reason
"""
split = list(filter(None, line.strip().split(" ", 2)))
if len(split) < 3:
raise InvalidStatusLine(line)
http_version, status, reason = split
if not _is_valid_http_version(http_version):
raise InvalidStatusLine(line)
version = http_version[:4]
if not re.match(r"\d{3}", status):
raise InvalidStatusLine(line)
status = int(status)
if status < 100 or status > 999:
raise InvalidStatusLine(line)
return version, status, reason
def parse_request_line(line: str):
"""
Parses the specified line as and HTTP request-line.
Returns the method, target as ParseResult and HTTP version from the request-line.
@param line: the request-line to be parsed
@raise InvalidRequestLine: if the line couldn't be parsed.
@raise BadRequest: Invalid HTTP method, Invalid HTTP-version or Invalid target
@return: tuple of the method, target and HTTP-version
"""
split = list(filter(None, line.rstrip().split(" ", 2)))
if len(split) < 3:
raise InvalidRequestLine(line)
method, target, version = split
if method not in ("CONNECT", "DELETE", "GET", "HEAD", "OPTIONS", "POST", "PUT", "TRACE"):
raise BadRequest(f"Invalid method: {method}")
if not _is_valid_http_version(version):
logging.debug("[ABRT] request: invalid http-version=%r", version)
raise BadRequest(f"Invalid HTTP-version: {version}")
if len(target) == "":
raise BadRequest()
parsed_target = urlsplit(target)
return method, parsed_target, version.split("/")[1]
def parse_headers(lines):
headers = []
try:
# first header after the start-line may not start with a space
line = next(lines)
while True:
if line[0].isspace():
continue
else:
break
while True:
if line in ("\r\n", "\n", ""):
break
if line[0].isspace():
headers[-1] = headers[-1].rstrip("\r\n")
headers.append(line.lstrip())
line = next(lines)
except StopIteration:
# No more lines to be parsed
pass
result = {}
header_str = "".join(headers)
for line in header_str.splitlines():
pos = line.find(":")
if pos <= 0 or pos >= len(line) - 1:
continue
(header, value) = map(str.strip, line.split(":", 1))
check_next_header(result, header, value)
result[header.lower()] = value.lower()
return result
def check_next_header(headers, next_header: str, next_value: str):
if next_header == "content-length":
if "content-length" in headers:
logging.error("Multiple content-length headers specified")
raise InvalidResponse()
if not next_value.isnumeric() or int(next_value) <= 0:
logging.error("Invalid content-length value: %r", next_value)
raise InvalidResponse()
def parse_uri(uri: str):
parsed = urlsplit(uri)
# If there is no netloc, the given string is not a valid URI, so split on /
if parsed.hostname:
host = parsed.hostname
path = parsed.path
if parsed.query != '':
path = f"{path}?{parsed.query}"
elif "/" in uri:
(host, path) = uri.split("/", 1)
else:
host = uri
path = "/"
if ":" in host:
host, port = host.split(":", 1)
elif parsed.scheme == "https":
port = 443
else:
port = 80
return host, port, path
def get_uri(url: str):
"""
Returns a valid URI of the specified URL.
"""
parsed = urlsplit(url)
result = f"http://{parsed.netloc}{parsed.path}"
if parsed.query != "":
result = f"{result}?{parsed.query}"
return result
def urljoin(base, url):
"""
Join a base url and a URL to form a absolute url.
"""
return urllib.parse.urljoin(base, url)
def get_charset(headers: Dict[str, str]):
if "content-type" in headers:
content_type = headers["content-type"]
match = re.search(r"charset\s*=\s*([a-z\-0-9]*)", content_type, re.I)
if match:
return match.group(1)
return FORMAT