# coding=utf-8
from __future__ import annotations
import mimetypes
import os
import shutil
import zipstream
from inginious.common.filesystems import FileSystemProvider
[docs]class LocalFSProvider(FileSystemProvider):
"""
A FileSystemProvider that uses a real on-disk filesystem
"""
[docs] @classmethod
def get_needed_args(cls):
""" Returns a list of arguments needed to create a FileSystemProvider. In the form
{
"arg1": (int, False, "description1"),
"arg2: (str, True, "description2")
}
The first part of the tuple is the type, the second indicates if the arg is mandatory
Only int and str are supported as types.
"""
return {
"location": (str, True, "On-disk path to the directory containing courses/tasks")
}
[docs] @classmethod
def init_from_args(cls, location): # pylint: disable=arguments-differ
""" Given the args from get_needed_args, creates the FileSystemProvider """
return LocalFSProvider(location)
[docs] def from_subfolder(self, subfolder: str) -> LocalFSProvider:
self._checkpath(subfolder)
return LocalFSProvider(self.prefix + subfolder)
[docs] def exists(self, path: str=None) -> bool:
if path is None:
path = self.prefix
else:
path = os.path.join(self.prefix, path)
return os.path.exists(path)
[docs] def ensure_exists(self):
if not os.path.exists(self.prefix):
os.makedirs(self.prefix)
[docs] def put(self, filepath, content):
self._checkpath(filepath)
fullpath = os.path.join(self.prefix, filepath)
if "/" in fullpath:
os.makedirs(os.path.join(*(os.path.split(fullpath)[:-1])), exist_ok=True)
if isinstance(content, str):
content = content.encode("utf-8")
open(fullpath, 'wb').write(content)
[docs] def get_fd(self, filepath: str, timestamp=None):
self._checkpath(filepath)
return open(os.path.join(self.prefix, filepath), 'rb')
[docs] def get(self, filepath, timestamp=None):
return self.get_fd(filepath, timestamp).read()
[docs] def list(self, folders: bool=True, files: bool=True, recursive: bool=False) -> list:
if recursive:
output = []
for root, subdirs, listed_files in os.walk(self.prefix):
if folders:
output += [root+"/"+d for d in subdirs]
if files:
output += [root+"/"+f for f in listed_files]
output = [os.path.relpath(f, self.prefix) for f in output]
else:
if files and folders:
condition = lambda x: True
elif files and not folders:
condition = lambda x: os.path.isfile(os.path.join(self.prefix, x))
elif folders and not files:
condition = lambda x: os.path.isdir(os.path.join(self.prefix, x))
else:
return []
output = [f for f in os.listdir(self.prefix) if condition(f)]
isdir = lambda x: '/' if os.path.isdir(os.path.join(self.prefix, x)) else ''
return [f+isdir(f) for f in output]
[docs] def delete(self, filepath: str=None):
if filepath is None:
filepath = self.prefix
else:
self._checkpath(filepath)
filepath = os.path.join(self.prefix, filepath)
if os.path.isdir(filepath):
shutil.rmtree(filepath)
else:
os.unlink(filepath)
[docs] def get_last_modification_time(self, filepath):
self._checkpath(filepath)
try:
return os.stat(os.path.join(self.prefix, filepath)).st_mtime
except:
raise FileNotFoundError()
[docs] def move(self, src, dest):
self._checkpath(src)
self._checkpath(dest)
if "/" in dest:
os.makedirs(os.path.join(self.prefix, *(os.path.split(dest)[:-1])), exist_ok=True)
shutil.move(os.path.join(self.prefix, src), os.path.join(self.prefix, dest))
[docs] def copy_to(self, src_disk, dest=None):
if dest is None:
dest = self.prefix
else:
self._checkpath(dest)
dest = os.path.join(self.prefix, dest)
self._recursive_overwrite(src_disk, dest)
[docs] def copy_from(self, src, dest_disk):
if src is None:
src = self.prefix
else:
self._checkpath(src)
src = os.path.join(self.prefix, src)
self._recursive_overwrite(src, dest_disk)
def _recursive_overwrite(self, src, dest):
"""
Copy src to dest, recursively and with file overwrite.
"""
if os.path.isdir(src):
if not os.path.isdir(dest):
os.makedirs(dest)
files = os.listdir(src)
for file in files:
self._recursive_overwrite(os.path.join(src, file),
os.path.join(dest, file))
else:
dirname = os.path.dirname(dest)
if not os.path.exists(dirname):
os.makedirs(dirname)
shutil.copy2(src, dest, follow_symlinks=False)
[docs] def distribute(self, filepath, allow_folders=True):
self._checkpath(filepath)
path = os.path.abspath(os.path.join(self.prefix, filepath))
if not os.path.exists(path):
return "invalid", None, None
if os.path.isdir(path):
if not allow_folders:
return "invalid", None, None
zipf = zipstream.ZipFile()
for root, _, files in os.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
arcpath = os.path.relpath(file_path, path)
zipf.write(file_path, arcpath)
return "local", "application/zip", zipf
elif os.path.isfile(path):
mimetypes.init()
mime_type = mimetypes.guess_type(path)
return "local", mime_type[0] or "application/octet-stream", open(path, 'rb')