Files
CN2021/server/command.py

240 lines
5.9 KiB
Python

import mimetypes
import os
import sys
from abc import ABC, abstractmethod
from datetime import datetime
from httplib import parser
from httplib.exceptions import NotFound, Forbidden, NotModified
from httplib.httpsocket import FORMAT
from httplib.message import RequestMessage as Message
CONTENT_ROOT = os.path.join(os.path.dirname(sys.argv[0]), "public")
status_message = {
200: "OK",
201: "Created",
202: "Accepted",
204: "No Content",
304: "Not Modified",
400: "Bad Request",
404: "Not Found",
500: "Internal Server Error",
}
def create(message: Message):
"""
Creates a Command based on the specified message
@param message: the message to create the Command with.
@return: An instance of `AbstractCommand`
"""
if message.method == "GET":
return GetCommand(message)
elif message.method == "HEAD":
return HeadCommand(message)
elif message.method == "POST":
return PostCommand(message)
elif message.method == "PUT":
return PutCommand(message)
else:
raise ValueError()
class AbstractCommand(ABC):
path: str
msg: Message
def __init__(self, message: Message):
self.msg = message
pass
@property
@abstractmethod
def command(self):
pass
@property
@abstractmethod
def _conditional_headers(self):
pass
@abstractmethod
def execute(self):
pass
def _build_message(self, status: int, content_type: str, body: bytes, extra_headers=None):
if extra_headers is None:
extra_headers = {}
self._process_conditional_headers()
message = f"HTTP/1.1 {status} {status_message[status]}\r\n"
message += f"Date: {parser.get_date()}\r\n"
content_length = len(body)
message += f"Content-Length: {content_length}\r\n"
if content_type:
message += f"Content-Type: {content_type}"
if content_type.startswith("text"):
message += f"; charset={FORMAT}"
message += "\r\n"
elif content_length > 0:
message += f"Content-Type: application/octet-stream\r\n"
for header in extra_headers:
message += f"{header}: {extra_headers[header]}\r\n"
message += "\r\n"
message = message.encode(FORMAT)
if content_length > 0:
message += body
return message
def _get_path(self, check=True):
norm_path = os.path.normpath(self.msg.target.path)
if norm_path == "/":
path = CONTENT_ROOT + "/index.html"
else:
path = CONTENT_ROOT + norm_path
if check and not os.path.exists(path):
raise NotFound(path)
return path
def _process_conditional_headers(self):
for header in self._conditional_headers:
tmp = self.msg.headers.get(header)
if not tmp:
continue
self._conditional_headers[header]()
def _if_modified_since(self):
date_val = self.msg.headers.get("if-modified-since")
if not date_val:
return True
modified = datetime.utcfromtimestamp(os.path.getmtime(self._get_path(False)))
try:
min_date = datetime.strptime(date_val, '%a, %d %b %Y %H:%M:%S GMT')
except ValueError:
return True
if modified <= min_date:
raise NotModified(f"{modified} <= {min_date}")
return True
def get_mimetype(self, path):
mime = mimetypes.guess_type(path)[0]
if mime:
return mime
try:
file = open(path, "r", encoding=FORMAT)
file.readline()
file.close()
return "text/plain"
except UnicodeDecodeError:
return "application/octet-stream"
class AbstractModifyCommand(AbstractCommand, ABC):
@property
@abstractmethod
def _file_mode(self):
pass
@property
def _conditional_headers(self):
return {}
def execute(self):
path = self._get_path(False)
directory = os.path.dirname(path)
if not os.path.exists(directory):
raise Forbidden("Target directory does not exists!")
if os.path.exists(directory) and not os.path.isdir(directory):
raise Forbidden("Target directory is an existing file!")
exists = os.path.exists(path)
try:
with open(path, mode=f"{self._file_mode}b") as file:
file.write(self.msg.body)
except IsADirectoryError:
raise Forbidden("The target resource is a directory!")
if exists:
status = 204
else:
status = 201
location = parser.urljoin("/", os.path.relpath(path, CONTENT_ROOT))
return self._build_message(status, "text/plain", b"", {"Location": location})
class HeadCommand(AbstractCommand):
@property
def command(self):
return "HEAD"
@property
def _conditional_headers(self):
return {'if-modified-since': self._if_modified_since}
def execute(self):
path = self._get_path()
mime = self.get_mimetype(path)
return self._build_message(200, mime, b"")
class GetCommand(AbstractCommand):
@property
def command(self):
return "GET"
@property
def _conditional_headers(self):
return {'if-modified-since': self._if_modified_since}
def execute(self):
path = self._get_path()
mime = self.get_mimetype(path)
file = open(path, "rb")
buffer = file.read()
file.close()
return self._build_message(200, mime, buffer)
class PostCommand(AbstractModifyCommand):
@property
def command(self):
return "POST"
@property
def _file_mode(self):
return "a"
class PutCommand(AbstractModifyCommand):
@property
def command(self):
return "PUT"
@property
def _file_mode(self):
return "w"