Source code for inginious.frontend.courses

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.

""" A course class with some modification for users """
from __future__ import annotations

import copy
import gettext
import hashlib
import re
import os
import logging
from typing import Iterable, List, Any
from pylti1p3.tool_config import ToolConfDict
from datetime import datetime

from inginious.common.filesystems import FileSystemProvider, fetch_or_cache, invalidate_cache, get_fs_provider
from inginious.common.tags import Tag
from inginious.common.base import id_checker, get_json_or_yaml, loads_json_or_yaml
from inginious.frontend.accessible_time import AccessibleTime
from inginious.frontend.parsable_text import ParsableText
from inginious.frontend.user_manager import UserInfo
from inginious.frontend.task_dispensers.toc import TableOfContents
from inginious.frontend.plugins import plugin_manager
from inginious.frontend.task_dispensers import get_task_dispensers
from inginious.frontend.tasks import Task
from inginious.common.exceptions import InvalidNameException, CourseNotFoundException, CourseUnreadableException


def _load_course(course_fs : FileSystemProvider, courseid : str):
    # Try to open the course file
    try:
        logging.getLogger("inginious.course").info("Caching course %s", courseid)
        task_content = loads_json_or_yaml("course.yaml", course_fs.get("course.yaml"))
    except Exception as e:
        raise CourseUnreadableException(str(e))

    return Course(courseid, task_content)

[docs] class Course(object): """ A course with some modification for users """ def __init__(self, courseid, content): self._id = courseid self._content = content self._fs = get_fs_provider().from_subfolder(courseid) self._new_doc = not self._fs.exists() self._translations = {} try: self._name = self._content['name'] except: raise Exception("Course has an invalid name: " + self.get_id()) if self._content.get('nofrontend', False): raise Exception("That course is not allowed to be displayed directly in the webapp") try: self._admins = self._content.get('admins', []) self._tutors = self._content.get('tutors', []) self._description = self._content.get('description', '') self._accessible = AccessibleTime(self._content.get("accessible", None)) self._registration = AccessibleTime(self._content.get("registration", None)) self._registration_password = self._content.get('registration_password', None) self._registration_ac = self._content.get('registration_ac', None) if self._registration_ac not in [None, "username", "binding", "email"]: raise Exception("Course has an invalid value for registration_ac: " + self.get_id()) self._registration_ac_accept = self._content.get('registration_ac_accept', True) self._registration_ac_list = self._content.get('registration_ac_list', []) self._groups_student_choice = self._content.get("groups_student_choice", False) self._allow_unregister = self._content.get('allow_unregister', True) self._allow_preview = self._content.get('allow_preview', False) self._is_lti = self._content.get('is_lti', False) self._is_archive = self._content.get('archived', False) self._archive_date = datetime.fromisoformat(self._content["archive_date"]).astimezone() if "archive_date" in self._content else None self._lti_url = self._content.get('lti_url', '') self._lti_keys = self._content.get('lti_keys', {}) self._lti_config = self._content.get('lti_config', {}) self._lti_send_back_grade = self._content.get('lti_send_back_grade', False) self._tags = {key: Tag(key, tag_dict, self.gettext) for key, tag_dict in self._content.get("tags", {}).items()} task_dispenser_class = get_task_dispensers().get(self._content.get('task_dispenser', 'toc'), TableOfContents) # Here we use a lambda to ensure we do not pass a fixed list of tasks to the task dispenser self._task_dispenser = task_dispenser_class(lambda: self.get_tasks(), self._content.get("dispenser_data", {}), self.get_id()) except: raise Exception("Course has an invalid YAML spec: " + self.get_id()) # Force some parameters if LTI is active if self.is_lti(): self._accessible = AccessibleTime(True) self._registration = AccessibleTime(False) self._registration_password = None self._registration_ac = None self._registration_ac_list = [] self._groups_student_choice = False self._allow_unregister = False else: self._lti_keys = {} self._lti_config = {} self._lti_url = '' self._lti_send_back_grade = False # Build the regex for the ACL, allowing for fast matching. Only used internally. self._registration_ac_regex = self._build_ac_regex(self._registration_ac_list)
[docs] def set_translations(self, translations : dict[str, gettext.GNUTranslations]): self._translations = translations
[docs] def get_translation_obj(self, language): return self._translations.get(language, gettext.NullTranslations())
[docs] def gettext(self, language, text): return self.get_translation_obj(language).gettext(text) if text else ""
[docs] def get_id(self): """ Return the _id of this course """ return self._id
[docs] def get_fs(self): """ Returns a FileSystemProvider which points to the folder of this course """ return self._fs
[docs] def get_task(self, taskid): """ Returns a Task object """ return Task.get(self._id, taskid)
[docs] def get_descriptor(self): """ Get (a copy) the description of the course """ return copy.deepcopy(self._content)
[docs] def get_staff(self): """ Returns a list containing the usernames of all the staff users """ return list(set(self.get_tutors() + self.get_admins()))
[docs] def get_admins(self): """ Returns a list containing the usernames of the administrators of this course """ return self._admins
[docs] def get_tutors(self): """ Returns a list containing the usernames of the tutors assigned to this course """ return self._tutors
[docs] def is_open_to_non_staff(self): """ Returns true if the course is accessible by users that are not administrator of this course """ return self.get_accessibility().is_open()
[docs] def is_registration_possible(self, user_info: UserInfo): """ Returns true if users can register for this course """ return self.get_accessibility().is_open() and self._registration.is_open() and self.is_user_accepted_by_access_control(user_info)
[docs] def is_password_needed_for_registration(self): """ Returns true if a password is needed for registration """ return self._registration_password is not None
[docs] def get_registration_password(self): """ Returns the password needed for registration (None if there is no password) """ return self._registration_password
[docs] def get_accessibility(self, plugin_override=True): """ Return the AccessibleTime object associated with the accessibility of this course """ if self.is_archive(): return AccessibleTime(False) vals = plugin_manager.call_hook('course_accessibility', course=self, default=self._accessible) return vals[0] if len(vals) and plugin_override else self._accessible
[docs] def get_registration_accessibility(self): """ Return the AccessibleTime object associated with the registration """ return self._registration
[docs] def get_readable_tasks(self): """ Returns the list of all available tasks in a course """ return [ task[0:len(task)-1] # remove trailing / for task in self._fs.list(folders=True, files=False, recursive=False) if self._fs.from_subfolder(task).exists("task.yaml") ] if self._fs.exists() else []
[docs] def get_tasks(self) -> dict[str, Task]: """ Returns """ tasks = self.get_readable_tasks() output = {} for task in tasks: try: output[task] = self.get_task(task) except Exception as e: logging.getLogger("inginious.course." + self._id).info("Couldn't load task %s : %s", task, str(e)) return output
[docs] def get_access_control_method(self): """ Returns either None, "username", "binding", or "email", depending on the method used to verify that users can register to the course """ return self._registration_ac
[docs] def get_access_control_accept(self): """ Returns either True (accept) or False (deny), depending on the control type used to verify that users can register to the course """ return self._registration_ac_accept
[docs] def get_access_control_list(self) -> List[str]: """ Returns the list of all users/emails/binding methods/... (see get_access_control_method) allowed by the AC list """ return self._registration_ac_list
[docs] def can_students_choose_group(self): """ Returns True if the students can choose their groups """ return self._groups_student_choice
[docs] def is_lti(self): """ True if the current course is in LTI mode """ return self._is_lti
[docs] def lti_keys(self): """ {name: key} for the LTI customers """ return self._lti_keys if self._is_lti else {}
[docs] def lti_config(self): """ LTI Tool config dictionary. Specs are at https://github.com/dmitry-viskov/pylti1.3/blob/master/README.rst?plain=1#L70-L98 """ return self._lti_config if self._is_lti else {}
[docs] def lti_tool(self) -> ToolConfDict: """ LTI Tool object. """ lti_tool = ToolConfDict(self._lti_config) for iss in self._lti_config: for client_config in self._lti_config[iss]: lti_tool.set_private_key(iss, client_config['private_key'], client_id=client_config['client_id']) lti_tool.set_public_key(iss, client_config['public_key'], client_id=client_config['client_id']) return lti_tool
[docs] def lti_platform_instances_ids(self) -> Iterable[str]: """ Set of LTI Platform instance ids registered for this course. """ for iss in self._lti_config: for client_config in self._lti_config[iss]: for deployment_id in client_config['deployment_ids']: yield '/'.join([iss, client_config['client_id'], deployment_id])
[docs] def lti_keyset_hash(self, issuer: str, client_id: str) -> str: return hashlib.md5((issuer + client_id).encode('utf-8')).digest().hex()
[docs] def lti_url(self): """ Returns the URL to the external platform the course is hosted on """ return self._lti_url
[docs] def lti_send_back_grade(self): """ True if the current course should send back grade to the LTI Tool Consumer """ return self._is_lti and self._lti_send_back_grade
[docs] def is_user_accepted_by_access_control(self, user_info: UserInfo): """ Returns True if the user is allowed by the ACL """ if self.get_access_control_method() is None: return True keys_per_access_control_method = { "username": (lambda: [user_info.username]), "email": (lambda: [user_info.email]), "binding": (lambda: user_info.bindings.keys()) } if not user_info or self.get_access_control_method() not in keys_per_access_control_method: return False # check that at least one key matches in the list keys = keys_per_access_control_method[self.get_access_control_method()]() at_least_one = any(self._registration_ac_regex.fullmatch(key) for key in keys) return at_least_one if self.get_access_control_accept() else not at_least_one
[docs] def allow_preview(self): return self._allow_preview
[docs] def allow_unregister(self, plugin_override=True): """ Returns True if students can unregister from course """ vals = plugin_manager.call_hook('course_allow_unregister', course=self, default=self._allow_unregister) return vals[0] if len(vals) and plugin_override else self._allow_unregister
[docs] def get_name(self, language): """ Return the name of this course """ return self.gettext(language, self._name) if self._name else ""
[docs] def get_description(self, language): """Returns the course description """ description = self.gettext(language, self._description) if self._description else '' return ParsableText(description, "rst")
[docs] def get_tags(self): return self._tags
[docs] def get_task_dispenser(self): """ :return: the structure of the course """ return self._task_dispenser
def _build_ac_regex(self, list_ac): """ Build a regex for the AC list, allowing for fast matching. The regex is only used internally """ return re.compile('|'.join(re.escape(x).replace("\\*", ".*") for x in list_ac))
[docs] def is_archive(self): """ Returns true if the course is an archive""" return self._is_archive
[docs] def get_archiving_date(self): """ Returns the date at which the course was archived as a string (None if not archived)""" return self._archive_date
[docs] def set_descriptor_element(self, key: str, value: Any): self._content[key] = value
[docs] def save(self): """ Saves the Course into the filesystem """ self._fs.put("course.yaml", get_json_or_yaml("course.yaml", self._content)) if self._new_doc: logging.getLogger("inginious.course").info("Course %s created in the factory.", self._fs.prefix)
[docs] @classmethod def get(cls, courseid : str) -> Course: """ Fetch a course with id courseid from the specified course filesystem""" if not id_checker(courseid): raise InvalidNameException("Course with invalid name: " + courseid) course_fs = get_fs_provider().from_subfolder(courseid) if not course_fs.exists("course.yaml"): raise CourseNotFoundException() course = fetch_or_cache(course_fs, "course.yaml", lambda: _load_course(course_fs, courseid)) translations = {} i18n_fs = course_fs.from_subfolder("$i18n") if i18n_fs.exists(): for f in i18n_fs.list(folders=False, files=True, recursive=False): lang, ext = os.path.splitext(f) if ext == ".mo": translations[lang] = fetch_or_cache(i18n_fs, f, lambda: gettext.GNUTranslations(i18n_fs.get_fd(f))) course.set_translations(translations) return course
[docs] def delete(self): """ Erase the content of the course folder """ invalidate_cache(self._fs) self._fs.delete() logging.getLogger("inginious.course").info("Course %s erased from the factory.", self._fs.prefix)
[docs] @classmethod def get_all(cls) -> dict[str, Course]: """ Returns a dictionnary with courseid=>Course mapping """ output = {} for courseid in [f[0:len(f) - 1] for f in get_fs_provider().list(folders=True, files=False, recursive=False)]: try: output[courseid] = Course.get(courseid) except Exception as e: logging.getLogger("inginious.course").warning("Cannot open course : %s", courseid) return output