Source code for inginious.frontend.pages.tasks

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.

""" Task page """
import json
import mimetypes
import posixpath
import urllib.error
import urllib.parse
import urllib.request
import random
import time
import flask

from flask import redirect, Response
from werkzeug.exceptions import NotFound, HTTPException
from bson.objectid import ObjectId
from pymongo import ReturnDocument

from inginious.common.exceptions import TaskNotFoundException, CourseNotFoundException
from inginious.frontend.pages.course import handle_course_unavailable
from inginious.frontend.pages.utils import INGIniousPage, INGIniousAuthPage


[docs]class BaseTaskPage(object): """ Display a task (and allow to reload old submission/file uploaded during a submission) """ def __init__(self, calling_page): self.cp = calling_page self.submission_manager = self.cp.submission_manager self.user_manager = self.cp.user_manager self.database = self.cp.database self.course_factory = self.cp.course_factory self.template_helper = self.cp.template_helper self.default_allowed_file_extensions = self.cp.default_allowed_file_extensions self.default_max_file_size = self.cp.default_max_file_size self.webterm_link = self.cp.webterm_link self.plugin_manager = self.cp.plugin_manager
[docs] def preview_allowed(self, courseid, taskid): try: course = self.course_factory.get_course(courseid) except CourseNotFoundException as ex: raise NotFound(description=str(ex)) return course.get_accessibility().is_open() and course.allow_preview()
[docs] def GET(self, courseid, taskid, is_LTI): """ GET request """ username = self.user_manager.session_username() # Fetch the course try: course = self.course_factory.get_course(courseid) except CourseNotFoundException as ex: raise NotFound(description=str(ex)) if is_LTI and not self.user_manager.course_is_user_registered(course): self.user_manager.course_register_user(course, force=True) if not self.user_manager.course_is_open_to_user(course, username, is_LTI): return handle_course_unavailable(self.cp.app.get_homepath(), self.template_helper, self.user_manager, course) is_staff = self.user_manager.has_staff_rights_on_course(course, username) try: task = course.get_task(taskid) if not self.user_manager.task_is_visible_by_user(task, username, is_LTI): return self.template_helper.render("task_unavailable.html") except TaskNotFoundException: raise NotFound() user_task_list = course.get_task_dispenser().get_user_task_list([username])[username] if taskid not in user_task_list: previous_taskid = None next_taskid = None else: # Compute previous and next taskid index = user_task_list.index(taskid) previous_taskid = user_task_list[index - 1] if index > 0 else None next_taskid = user_task_list[index + 1] if index < len(user_task_list) - 1 else None self.user_manager.user_saw_task(username, courseid, taskid) is_staff = self.user_manager.has_staff_rights_on_course(course, username) userinput = flask.request.args if "submissionid" in userinput and "questionid" in userinput: # Download a previously submitted file submission = self.submission_manager.get_submission(userinput["submissionid"], user_check=not is_staff) if submission is None: raise self.cp.app.notfound(message=_("Submission doesn't exist.")) sinput = self.submission_manager.get_input_from_submission(submission, True) if userinput["questionid"] not in sinput: raise NotFound() if isinstance(sinput[userinput["questionid"]], dict): # File uploaded previously mimetypes.init() mime_type = mimetypes.guess_type(urllib.request.pathname2url(sinput[userinput["questionid"]]['filename'])) return Response(response=sinput[userinput["questionid"]]['value'], content_type=mime_type[0]) else: # Other file, download it as text return Response(response=sinput[userinput["questionid"]], content_type='text/plain') else: # Generate random inputs and save it into db random.seed(str(username if username is not None else "") + taskid + courseid + str( time.time() if task.regenerate_input_random() else "")) random_input_list = [random.random() for i in range(task.get_number_input_random())] user_task = self.database.user_tasks.find_one_and_update( { "courseid": task.get_course_id(), "taskid": task.get_id(), "username": self.user_manager.session_username() }, { "$set": {"random": random_input_list} }, return_document=ReturnDocument.AFTER ) submissionid = user_task.get('submissionid', None) eval_submission = self.database.submissions.find_one({'_id': ObjectId(submissionid)}) if submissionid else None students = [self.user_manager.session_username()] if task.is_group_task() and not self.user_manager.has_admin_rights_on_course(course, username): group = self.database.groups.find_one({"courseid": task.get_course_id(), "students": self.user_manager.session_username()}) if group is not None: students = group["students"] # we don't care for the other case, as the student won't be able to submit. submissions = self.submission_manager.get_user_submissions(task) if self.user_manager.session_logged_in() else [] user_info = self.user_manager.get_user_info(username) # Visible tags course_tags = course.get_tags() task_categories = task.get_categories() visible_tags = [course_tags[category] for category in task_categories if course_tags[category].is_visible_for_student() or self.user_manager.has_staff_rights_on_course(course)] # Problem dict pdict = {problem.get_id(): problem.get_type() for problem in task.get_problems()} is_input_list = {problem.get_id(): 1 if problem.input_type() == list else 0 for problem in task.get_problems()} # Display the task itself return self.template_helper.render("task.html", user_info=user_info, course=course, task=task, submissions=submissions, students=students, eval_submission=eval_submission, user_task=user_task, previous_taskid=previous_taskid, next_taskid=next_taskid, webterm_link=self.webterm_link, input_random_list=random_input_list, visible_tags=visible_tags, pdict=pdict, is_input_list=is_input_list)
[docs] def POST(self, courseid, taskid, isLTI): """ POST a new submission """ username = self.user_manager.session_username() course = self.course_factory.get_course(courseid) if not self.user_manager.course_is_open_to_user(course, username, isLTI): return handle_course_unavailable(self.cp.app.get_homepath(), self.template_helper, self.user_manager, course) is_staff = self.user_manager.has_staff_rights_on_course(course, username) is_admin = self.user_manager.has_admin_rights_on_course(course, username) task = course.get_task(taskid) if not self.user_manager.task_is_visible_by_user(task, username, isLTI): return self.template_helper.render("task_unavailable.html") self.user_manager.user_saw_task(username, courseid, taskid) userinput = flask.request.form if "@action" in userinput and userinput["@action"] == "submit": # Verify rights if not self.user_manager.task_can_user_submit(task, username, isLTI): return json.dumps({"status": "error", "title": _("Error"), "text": _("You are not allowed to submit for this task.")}) # Retrieve input random and check still valid random_input = self.database.user_tasks.find_one({"courseid": task.get_course_id(), "taskid": task.get_id(), "username": username}, { "random": 1 }) random_input = random_input["random"] if "random" in random_input else [] for i in range(0, len(random_input)): s = "@random_" + str(i) if s not in userinput or float(userinput[s]) != random_input[i]: return json.dumps({"status": "error", "title": _("Error"), "text": _("Your task has been regenerated. This current task is outdated.")}) # Reparse user input with array for multiple choices and files userinput = flask.request.form.copy() for problem in task.get_problems(): pid = problem.get_id() if problem.input_type() == list: userinput[pid] = flask.request.form.getlist(pid) elif problem.input_type() == dict: userinput[pid] = flask.request.files.get(pid) else: userinput[pid] = flask.request.form.get(pid) userinput = task.adapt_input_for_backend(userinput) if not task.input_is_consistent(userinput, self.default_allowed_file_extensions, self.default_max_file_size): return Response(content_type='application/json', response=json.dumps({ "status": "error", "title": _("Error"), "text": _("Please answer to all the questions and verify the extensions of the files " "you want to upload. Your responses were not tested.") })) del userinput['@action'] # Get debug info if the current user is an admin debug = is_admin if "@debug-mode" in userinput: if userinput["@debug-mode"] == "ssh" and debug: debug = "ssh" del userinput['@debug-mode'] # Start the submission try: submissionid, oldsubids = self.submission_manager.add_job(task, userinput, debug) return Response(content_type='application/json', response=json.dumps({ "status": "ok", "submissionid": str(submissionid), "remove": oldsubids, "text": _("<b>Your submission has been sent...</b>") })) except Exception as ex: return Response(content_type='application/json', response=json.dumps({ "status": "error", "title": _("Error"), "text": str(ex) })) elif "@action" in userinput and userinput["@action"] == "check" and "submissionid" in userinput: result = self.submission_manager.get_submission(userinput['submissionid'], user_check=not is_staff) if result is None: return Response(content_type='application/json', response=json.dumps({ 'status': "error", "title": _("Error"), "text": _("Internal error") })) elif self.submission_manager.is_done(result, user_check=not is_staff): result = self.submission_manager.get_input_from_submission(result) result = self.submission_manager.get_feedback_from_submission(result, show_everything=is_staff) # user_task always exists as we called user_saw_task before user_task = self.database.user_tasks.find_one({ "courseid":task.get_course_id(), "taskid": task.get_id(), "username": {"$in": result["username"]} }) default_submissionid = user_task.get('submissionid', None) if default_submissionid is None: # This should never happen, as user_manager.update_user_stats is called whenever a submission is done. return Response(content_type='application/json', response=json.dumps({ 'status': "error", "title": _("Error"), "text": _("Internal error") })) return Response(content_type='application/json', response=self.submission_to_json( task, result, is_admin, False, default_submissionid == result['_id'], tags=course.get_tags() )) else: return Response(content_type='application/json', response=self.submission_to_json( task, result, is_admin, False, tags=course.get_tags() )) elif "@action" in userinput and userinput["@action"] == "load_submission_input" and "submissionid" in userinput: submission = self.submission_manager.get_submission(userinput["submissionid"], user_check=not is_staff) submission = self.submission_manager.get_input_from_submission(submission) submission = self.submission_manager.get_feedback_from_submission(submission, show_everything=is_staff) if not submission: raise NotFound(description=_("Submission doesn't exist.")) return Response(content_type='application/json', response=self.submission_to_json( task, submission, is_admin, True, tags=course.get_tags() )) elif "@action" in userinput and userinput["@action"] == "kill" and "submissionid" in userinput: self.submission_manager.kill_running_submission(userinput["submissionid"]) # ignore return value return Response(content_type='application/json', response=json.dumps({'status': 'done'})) else: raise NotFound()
[docs] def submission_to_json(self, task, data, debug, reloading=False, replace=False, tags=None): """ Converts a submission to json (keeps only needed fields) """ if tags is None: tags = [] if "ssh_host" in data: return json.dumps({ 'status': "waiting", 'text': "<b>SSH server active</b>", 'ssh_host': data["ssh_host"], 'ssh_port': data["ssh_port"], 'ssh_user': data["ssh_user"], 'ssh_password': data["ssh_password"] }) # Here we are waiting. Let's send some useful information. waiting_data = self.submission_manager.get_job_queue_info(data["jobid"]) if "jobid" in data else None if waiting_data is not None and not reloading: nb_tasks_before, approx_wait_time = waiting_data wait_time = round(approx_wait_time) if nb_tasks_before == -1 and wait_time <= 0: text = _("<b>INGInious is currently grading your answers.</b> (almost done)") elif nb_tasks_before == -1: text = _("<b>INGInious is currently grading your answers.</b> (Approx. wait time: {} seconds)").format( wait_time) elif nb_tasks_before == 0: text = _("<b>You are next in the waiting queue!</b>") elif nb_tasks_before == 1: text = _("<b>There is one task in front of you in the waiting queue.</b>") else: text = _("<b>There are {} tasks in front of you in the waiting queue.</b>").format(nb_tasks_before) return json.dumps({'status': "waiting", 'text': text}) tojson = { 'status': data['status'], 'result': data.get('result', 'crash'), 'id': str(data["_id"]), 'submitted_on': str(data['submitted_on']), 'grade': str(data.get("grade", 0.0)), 'replace': replace and not reloading # Replace the evaluated submission } if "text" in data: tojson["text"] = data["text"] if "problems" in data: tojson["problems"] = data["problems"] if debug: tojson["debug"] = self._cut_long_chains(data) if tojson['status'] == 'waiting': tojson["title"] = _("<b>Your submission has been sent...</b>") elif tojson["result"] == "failed": tojson["title"] = _("There are some errors in your answer. Your score is {score}%.").format(score=data["grade"]) elif tojson["result"] == "success": tojson["title"] = _("Your answer passed the tests! Your score is {score}%.").format(score=data["grade"]) elif tojson["result"] == "timeout": tojson["title"] = _("Your submission timed out. Your score is {score}%.").format(score=data["grade"]) elif tojson["result"] == "overflow": tojson["title"] = _("Your submission made an overflow. Your score is {score}%.").format(score=data["grade"]) elif tojson["result"] == "killed": tojson["title"] = _("Your submission was killed.") else: tojson["title"] = _("An internal error occurred. Please retry later. " "If the error persists, send an email to the course administrator.") tojson["title"] += " " + _("[Submission #{submissionid}]").format(submissionid=data["_id"]) tojson["title"] = self.plugin_manager.call_hook_recursive("feedback_title", task=task, submission=data, title=tojson["title"])["title"] tojson["text"] = data.get("text", "") tojson["text"] = self.plugin_manager.call_hook_recursive("feedback_text", task=task, submission=data, text=tojson["text"])["text"] if reloading: # Set status='ok' because we are reloading an old submission. tojson["status"] = 'ok' # And also include input tojson["input"] = data.get('input', {}) if "tests" in data: tojson["tests"] = {} for key, tag in tags.items(): # Tags only visible for admins should not appear in the json for students. if tag.get_type() in [0, 1] and (tag.is_visible_for_student() or debug) and key in data["tests"]: tojson["tests"][key] = data["tests"][key] if debug: #We add also auto tags when we are admin for tag in data["tests"]: if tag.startswith("*auto-tag-"): tojson["tests"][tag] = data["tests"][tag] # allow plugins to insert javascript to be run in the browser after the submission is loaded tojson["feedback_script"] = "".join(self.plugin_manager.call_hook("feedback_script", task=task, submission=data)) return json.dumps(tojson, default=str)
def _cut_long_chains(self, data, limit=1000): """ Cut all strings and byte chains in a dictionary if they exceed a limit in characters or bytes """ if isinstance(data, dict): new_data = {} for key, value in data.items(): if isinstance(value, bytes) and len(value) > limit: new_data[key] = value[:limit] + _(" <truncated>").encode("utf-8") elif isinstance(value, str) and len(value) > limit: new_data[key] = value[:limit] + _(" <truncated>") elif isinstance(value, dict): new_data[key] = self._cut_long_chains(value) else: new_data[key] = value return new_data return data
[docs]class TaskPageStaticDownload(INGIniousPage): """ Allow to download files stored in the task folder """
[docs] def is_lti_page(self): # authorize LTI sessions to download static files return True
[docs] def GET(self, courseid, taskid, path): # pylint: disable=arguments-differ """ GET request """ try: course = self.course_factory.get_course(courseid) if not self.user_manager.course_is_open_to_user(course): return handle_course_unavailable(self.app.get_homepath(), self.template_helper, self.user_manager, course) path_norm = posixpath.normpath(urllib.parse.unquote(path)) if taskid == "$common": public_folder = course.get_fs().from_subfolder("$common").from_subfolder("public") else: task = course.get_task(taskid) if not self.user_manager.task_is_visible_by_user(task): # ignore LTI check here return self.template_helper.render("task_unavailable.html") public_folder = task.get_fs().from_subfolder("public") (method, mimetype_or_none, file_or_url) = public_folder.distribute(path_norm, False) if method == "local": return Response(response=file_or_url, content_type=mimetype_or_none) elif method == "url": return redirect(file_or_url) else: raise NotFound() except TaskNotFoundException: raise NotFound() except HTTPException as error_or_redirect: raise error_or_redirect
[docs]class TaskPage(INGIniousAuthPage):
[docs] def GET_AUTH(self, courseid, taskid): return BaseTaskPage(self).GET(courseid, taskid, False)
[docs] def POST_AUTH(self, courseid, taskid): return BaseTaskPage(self).POST(courseid, taskid, False)
[docs] def preview_allowed(self, courseid, taskid): return BaseTaskPage(self).preview_allowed(courseid, taskid)