import mimetypes import os import sys from abc import ABC, abstractmethod from datetime import datetime from httplib import parser from httplib.exceptions import NotFound, Forbidden, NotModified, BadRequest from httplib.httpsocket import FORMAT from httplib.message import RequestMessage as Message CONTENT_ROOT = os.path.join(os.path.dirname(sys.argv[0]), "public") status_message = { 200: "OK", 201: "Created", 202: "Accepted", 204: "No Content", 304: "Not Modified", 400: "Bad Request", 404: "Not Found", 500: "Internal Server Error", } def create(message: Message): """ Creates a Command based on the specified message @param message: the message to create the Command with. @return: An instance of `AbstractCommand` """ if message.method == "GET": return GetCommand(message) elif message.method == "HEAD": return HeadCommand(message) elif message.method == "POST": return PostCommand(message) elif message.method == "PUT": return PutCommand(message) else: raise ValueError() class AbstractCommand(ABC): path: str msg: Message def __init__(self, message: Message): self.msg = message pass @property @abstractmethod def command(self): pass @property @abstractmethod def _conditional_headers(self): pass @abstractmethod def execute(self): pass def _build_message(self, status: int, content_type: str, body: bytes, extra_headers=None): self._process_conditional_headers() message = f"HTTP/1.1 {status} {status_message[status]}\r\n" message += f"Date: {parser.get_date()}\r\n" content_length = len(body) message += f"Content-Length: {content_length}\r\n" if content_type: message += f"Content-Type: {content_type}" if content_type.startswith("text"): message += f"; charset={FORMAT}" message += "\r\n" elif content_length > 0: message += f"Content-Type: application/octet-stream\r\n" for header in extra_headers: message += f"{header}: {extra_headers[header]}\r\n" message += "\r\n" message = message.encode(FORMAT) if content_length > 0: message += body return message def _get_path(self, check=True): norm_path = os.path.normpath(self.msg.target.path) if norm_path == "/": path = CONTENT_ROOT + "/index.html" else: path = CONTENT_ROOT + norm_path if check and not os.path.exists(path): raise NotFound(path) return path def _process_conditional_headers(self): for header in self._conditional_headers: tmp = self.msg.headers.get(header) if not tmp: continue self._conditional_headers[header]() def _if_modified_since(self): date_val = self.msg.headers.get("if-modified-since") if not date_val: return True modified = datetime.utcfromtimestamp(os.path.getmtime(self._get_path(False))) try: min_date = datetime.strptime(date_val, '%a, %d %b %Y %H:%M:%S GMT') except ValueError: return True if modified <= min_date: raise NotModified(f"{modified} <= {min_date}") return True def get_mimetype(self, path): mime = mimetypes.guess_type(path)[0] if mime: return mime try: file = open(path, "r", encoding=FORMAT) file.readline() file.close() return "text/plain" except UnicodeDecodeError: return "application/octet-stream" class AbstractModifyCommand(AbstractCommand, ABC): @property @abstractmethod def _file_mode(self): pass @property def _conditional_headers(self): return {} def execute(self): path = self._get_path(False) directory = os.path.dirname(path) if not os.path.exists(directory): raise Forbidden("Target directory does not exists!") if os.path.exists(directory) and not os.path.isdir(directory): raise Forbidden("Target directory is an existing file!") exists = os.path.exists(path) try: with open(path, mode=f"{self._file_mode}b") as file: file.write(self.msg.body) except IsADirectoryError: raise Forbidden("The target resource is a directory!") if exists: status = 204 else: status = 201 location = parser.urljoin("/", os.path.relpath(path, CONTENT_ROOT)) return self._build_message(status, "text/plain", b"", {"Location": location}) class HeadCommand(AbstractCommand): @property def command(self): return "HEAD" @property def _conditional_headers(self): return {'if-modified-since': self._if_modified_since} def execute(self): path = self._get_path() mime = self.get_mimetype(path) return self._build_message(200, mime, b"") class GetCommand(AbstractCommand): @property def command(self): return "GET" @property def _conditional_headers(self): return {'if-modified-since': self._if_modified_since} def execute(self): path = self._get_path() mime = self.get_mimetype(path) file = open(path, "rb") buffer = file.read() file.close() return self._build_message(200, mime, buffer) class PostCommand(AbstractModifyCommand): @property def command(self): return "POST" @property def _file_mode(self): return "a" class PutCommand(AbstractModifyCommand): @property def command(self): return "PUT" @property def _file_mode(self): return "w" def execute(self): if "content-range" in self.msg.headers: raise BadRequest("PUT request contains Content-Range header") super().execute()