# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
""" Manages submissions """
import io
import gettext
import logging
import os.path
import tarfile
import tempfile
import time
from datetime import datetime
import bson
import pymongo
import web
from bson.objectid import ObjectId
from pymongo.collection import ReturnDocument
import inginious.common.custom_yaml
from inginious.frontend.parsable_text import ParsableText
[docs]class WebAppSubmissionManager:
""" Manages submissions. Communicates with the database and the client. """
def __init__(self, client, user_manager, database, gridfs, hook_manager, lti_outcome_manager):
"""
:type client: inginious.client.client.AbstractClient
:type user_manager: inginious.frontend.user_manager.UserManager
:type database: pymongo.database.Database
:type gridfs: gridfs.GridFS
:type hook_manager: inginious.common.hook_manager.HookManager
:return:
"""
self._client = client
self._user_manager = user_manager
self._database = database
self._gridfs = gridfs
self._hook_manager = hook_manager
self._logger = logging.getLogger("inginious.webapp.submissions")
self._lti_outcome_manager = lti_outcome_manager
def _job_done_callback(self, submissionid, task, result, grade, problems, tests, custom, archive, stdout, stderr, newsub=True):
""" Callback called by Client when a job is done. Updates the submission in the database with the data returned after the completion of the
job """
submission = self.get_submission(submissionid, False)
submission = self.get_input_from_submission(submission)
data = {
"status": ("done" if result[0] == "success" or result[0] == "failed" else "error"),
# error only if error was made by INGInious
"result": result[0],
"grade": grade,
"text": result[1],
"tests": tests,
"problems": problems,
"archive": (self._gridfs.put(archive) if archive is not None else None),
"custom": custom,
"stdout": stdout,
"stderr": stderr
}
unset_obj = {
"jobid": "",
"ssh_host": "",
"ssh_port": "",
"ssh_password": ""
}
# Save submission to database
submission = self._database.submissions.find_one_and_update(
{"_id": submission["_id"]},
{"$set": data, "$unset": unset_obj},
return_document=ReturnDocument.AFTER
)
self._hook_manager.call_hook("submission_done", submission=submission, archive=archive, newsub=newsub)
for username in submission["username"]:
self._user_manager.update_user_stats(username, task, submission, result[0], grade, newsub)
if "outcome_service_url" in submission and "outcome_result_id" in submission and "outcome_consumer_key" in submission:
for username in submission["username"]:
self._lti_outcome_manager.add(username,
submission["courseid"],
submission["taskid"],
submission["outcome_consumer_key"],
submission["outcome_service_url"],
submission["outcome_result_id"])
def _before_submission_insertion(self, task, inputdata, debug, obj):
"""
Called before any new submission is inserted into the database. Allows you to modify obj, the new document that will be inserted into the
database. Should be overridden in subclasses.
:param task: Task related to the submission
:param inputdata: input of the student
:param debug: True, False or "ssh". See add_job.
:param obj: the new document that will be inserted
"""
username = self._user_manager.session_username()
if task.is_group_task() and not self._user_manager.has_staff_rights_on_course(task.get_course(), username):
group = self._database.aggregations.find_one(
{"courseid": task.get_course_id(), "groups.students": username},
{"groups": {"$elemMatch": {"students": username}}})
obj.update({"username": group["groups"][0]["students"]})
else:
obj.update({"username": [username]})
lti_info = self._user_manager.session_lti_info()
if lti_info is not None and task.get_course().lti_send_back_grade():
outcome_service_url = lti_info["outcome_service_url"]
outcome_result_id = lti_info["outcome_result_id"]
outcome_consumer_key = lti_info["consumer_key"]
# safety check
if outcome_result_id is None or outcome_service_url is None:
self._logger.error("outcome_result_id or outcome_service_url is None, but grade needs to be sent back to TC! Ignoring.")
return
obj.update({"outcome_service_url": outcome_service_url,
"outcome_result_id": outcome_result_id,
"outcome_consumer_key": outcome_consumer_key})
def _after_submission_insertion(self, task, inputdata, debug, submission, submissionid):
"""
Called after any new submission is inserted into the database, but before starting the job. Should be overridden in subclasses.
:param task: Task related to the submission
:param inputdata: input of the student
:param debug: True, False or "ssh". See add_job.
:param submission: the new document that was inserted (do not contain _id)
:param submissionid: submission id of the submission
"""
# If we are submitting for a group, send the group (user list joined with ",") as username
if "group" not in [p.get_id() for p in task.get_problems()]: # do not overwrite
username = self._user_manager.session_username()
if task.is_group_task() and not self._user_manager.has_staff_rights_on_course(task.get_course(), username):
group = self._database.aggregations.find_one(
{"courseid": task.get_course_id(), "groups.students": username},
{"groups": {"$elemMatch": {"students": username}}})
inputdata["username"] = ','.join(group["groups"][0]["students"])
return self._delete_exceeding_submissions(self._user_manager.session_username(), task)
[docs] def replay_job(self, task, submission, copy=False, debug=False):
"""
Replay a submission: add the same job in the queue, keeping submission id, submission date and input data
:param submission: Submission to replay
:param copy: If copy is true, the submission will be copied to admin submissions before replay
:param debug: If debug is true, more debug data will be saved
"""
if not self._user_manager.session_logged_in():
raise Exception("A user must be logged in to submit an object")
# Don't enable ssh debug
ssh_callback = lambda host, port, password: None
if debug == "ssh":
ssh_callback = lambda host, port, password: self._handle_ssh_callback(submission["_id"], host, port, password)
# Load input data and add username to dict
inputdata = bson.BSON.decode(self._gridfs.get(submission["input"]).read())
if not copy:
submissionid = submission["_id"]
username = submission["username"][0] # TODO: this may be inconsistent with add_job
# Remove the submission archive : it will be regenerated
if submission.get("archive", None) is not None:
self._gridfs.delete(submission["archive"])
else:
del submission["_id"]
username = self._user_manager.session_username()
submission["username"] = [username]
submission["submitted_on"] = datetime.now()
inputdata["@username"] = username
inputdata["@lang"] = self._user_manager.session_language()
submission["input"] = self._gridfs.put(bson.BSON.encode(inputdata))
submission["tests"] = {} # Be sure tags are reinitialized
submissionid = self._database.submissions.insert(submission)
jobid = self._client.new_job(task, inputdata,
(lambda result, grade, problems, tests, custom, archive, stdout, stderr:
self._job_done_callback(submissionid, task, result, grade, problems, tests, custom, archive, stdout, stderr, copy)),
"Frontend - {}".format(submission["username"]), debug, ssh_callback)
# Clean the submission document in db
self._database.submissions.update(
{"_id": submission["_id"]},
{"$set": {"jobid": jobid, "status": "waiting", "response_type": task.get_response_type()},
"$unset": {"result": "", "grade": "", "text": "", "tests": "", "problems": "", "archive": "", "custom": ""}
})
if not copy:
self._logger.info("Replaying submission %s - %s - %s - %s", submission["username"], submission["courseid"],
submission["taskid"], submission["_id"])
else:
self._logger.info("Copying submission %s - %s - %s - %s as %s", submission["username"], submission["courseid"],
submission["taskid"], submission["_id"], self._user_manager.session_username())
[docs] def get_available_environments(self):
""":return a list of available environments """
return self._client.get_available_containers()
[docs] def get_submission(self, submissionid, user_check=True):
""" Get a submission from the database """
sub = self._database.submissions.find_one({'_id': ObjectId(submissionid)})
if user_check and not self.user_is_submission_owner(sub):
return None
return sub
[docs] def add_job(self, task, inputdata, debug=False):
"""
Add a job in the queue and returns a submission id.
:param task: Task instance
:type task: inginious.frontend.tasks.WebAppTask
:param inputdata: the input as a dictionary
:type inputdata: dict
:param debug: If debug is true, more debug data will be saved
:type debug: bool
:returns: the new submission id and the removed submission id
"""
if not self._user_manager.session_logged_in():
raise Exception("A user must be logged in to submit an object")
username = self._user_manager.session_username()
# Prevent student from submitting several submissions together
waiting_submission = self._database.submissions.find_one({
"courseid": task.get_course_id(),
"taskid": task.get_id(),
"username": username,
"status": "waiting"})
if waiting_submission is not None:
raise Exception("A submission is already pending for this task!")
obj = {
"courseid": task.get_course_id(),
"taskid": task.get_id(),
"status": "waiting",
"submitted_on": datetime.now(),
"username": [username],
"response_type": task.get_response_type()
}
# Send additional data to the client in inputdata. For now, the username and the language. New fields can be added with the
# new_submission hook
inputdata["@username"] = username
inputdata["@lang"] = self._user_manager.session_language()
# Retrieve input random
random_input = self._database.user_tasks.find_one({"courseid": task.get_course_id(), "taskid": task.get_id(), "username": username}, { "random": 1 })
inputdata["@random"] = random_input["random"] if "random" in random_input else []
self._hook_manager.call_hook("new_submission", submission=obj, inputdata=inputdata)
obj["input"] = self._gridfs.put(bson.BSON.encode(inputdata))
self._before_submission_insertion(task, inputdata, debug, obj)
submissionid = self._database.submissions.insert(obj)
to_remove = self._after_submission_insertion(task, inputdata, debug, obj, submissionid)
ssh_callback = lambda host, port, password: None
if debug == "ssh":
ssh_callback = lambda host, port, password: self._handle_ssh_callback(submissionid, host, port, password)
jobid = self._client.new_job(task, inputdata,
(lambda result, grade, problems, tests, custom, archive, stdout, stderr:
self._job_done_callback(submissionid, task, result, grade, problems, tests, custom, archive, stdout, stderr, True)),
"Frontend - {}".format(username), debug, ssh_callback)
self._database.submissions.update(
{"_id": submissionid, "status": "waiting"},
{"$set": {"jobid": jobid}}
)
self._logger.info("New submission from %s - %s - %s/%s - %s", self._user_manager.session_username(),
self._user_manager.session_email(), task.get_course_id(), task.get_id(),
web.ctx['ip'])
return submissionid, to_remove
def _delete_exceeding_submissions(self, username, task, max_submissions_bound=-1):
""" Deletes exceeding submissions from the database, to keep the database relatively small """
if max_submissions_bound <= 0:
max_submissions = task.get_stored_submissions()
elif task.get_stored_submissions() <= 0:
max_submissions = max_submissions_bound
else:
max_submissions = min(max_submissions_bound, task.get_stored_submissions())
if max_submissions <= 0:
return []
tasks = list(self._database.submissions.find(
{"username": username, "courseid": task.get_course_id(), "taskid": task.get_id()},
projection=["_id", "status", "result", "grade", "submitted_on"],
sort=[('submitted_on', pymongo.ASCENDING)]))
# List the entries to keep
to_keep = set([])
if task.get_evaluate() == 'best':
# Find the best "status"="done" and "result"="success"
idx_best = -1
for idx, val in enumerate(tasks):
if val["status"] == "done":
if idx_best == -1 or tasks[idx_best]["grade"] < val["grade"]:
idx_best = idx
# Always keep the best submission
if idx_best != -1:
to_keep.add(tasks[idx_best]["_id"])
elif task.get_evaluate() == 'student':
user_task = self._database.user_tasks.find_one({
"courseid": task.get_course_id(),
"taskid": task.get_id(),
"username": username
})
submissionid = user_task.get('submissionid', None)
if submissionid:
to_keep.add(submissionid)
# Always keep running submissions
for val in tasks:
if val["status"] == "waiting":
to_keep.add(val["_id"])
while len(to_keep) < max_submissions and len(tasks) > 0:
to_keep.add(tasks.pop()["_id"])
to_delete = {val["_id"] for val in tasks}.difference(to_keep)
self._database.submissions.delete_many({"_id": {"$in": list(to_delete)}})
return list(map(str, to_delete))
[docs] def get_feedback_from_submission(self, submission, only_feedback=False, show_everything=False, translation=gettext.NullTranslations()):
"""
Get the input of a submission. If only_input is False, returns the full submissions with a dictionnary object at the key "input".
Else, returns only the dictionnary.
If show_everything is True, feedback normally hidden is shown.
"""
if only_feedback:
submission = {"text": submission.get("text", None), "problems": dict(submission.get("problems", {}))}
if "text" in submission:
submission["text"] = ParsableText(submission["text"], submission["response_type"], show_everything, translation).parse()
if "problems" in submission:
for problem in submission["problems"]:
if isinstance(submission["problems"][problem], str): # fallback for old-style submissions
submission["problems"][problem] = (submission.get('result', 'crash'), ParsableText(submission["problems"][problem],
submission["response_type"],
show_everything, translation).parse())
else: # new-style submission
submission["problems"][problem] = (submission["problems"][problem][0], ParsableText(submission["problems"][problem][1],
submission["response_type"],
show_everything, translation).parse())
return submission
[docs] def is_running(self, submissionid, user_check=True):
""" Tells if a submission is running/in queue """
submission = self.get_submission(submissionid, user_check)
return submission["status"] == "waiting"
[docs] def is_done(self, submissionid_or_submission, user_check=True):
""" Tells if a submission is done and its result is available """
# TODO: not a very nice way to avoid too many database call. Should be refactored.
if isinstance(submissionid_or_submission, dict):
submission = submissionid_or_submission
else:
submission = self.get_submission(submissionid_or_submission, False)
if user_check and not self.user_is_submission_owner(submission):
return None
return submission["status"] == "done" or submission["status"] == "error"
[docs] def kill_running_submission(self, submissionid, user_check=True):
""" Attempt to kill the remote job associated with this submission id.
:param submissionid:
:param user_check: Check if the current user owns this submission
:return: True if the job was killed, False if an error occurred
"""
submission = self.get_submission(submissionid, user_check)
if not submission:
return False
if "jobid" not in submission:
return False
return self._client.kill_job(submission["jobid"])
[docs] def user_is_submission_owner(self, submission):
""" Returns true if the current user is the owner of this jobid, false else """
if not self._user_manager.session_logged_in():
raise Exception("A user must be logged in to verify if he owns a jobid")
return self._user_manager.session_username() in submission["username"]
[docs] def get_user_submissions(self, task):
""" Get all the user's submissions for a given task """
if not self._user_manager.session_logged_in():
raise Exception("A user must be logged in to get his submissions")
cursor = self._database.submissions.find({"username": self._user_manager.session_username(),
"taskid": task.get_id(), "courseid": task.get_course_id()})
cursor.sort([("submitted_on", -1)])
return list(cursor)
[docs] def get_user_last_submissions(self, limit=5, request=None):
""" Get last submissions of a user """
if request is None:
request = {}
request.update({"username": self._user_manager.session_username()})
# Before, submissions were first sorted by submission date, then grouped
# and then resorted by submission date before limiting. Actually, grouping
# and pushing, keeping the max date, followed by result filtering is much more
# efficient
data = self._database.submissions.aggregate([
{"$match": request},
{"$group": {"_id": {"courseid": "$courseid", "taskid": "$taskid"},
"submitted_on": {"$max": "$submitted_on"},
"submissions": {"$push": {
"_id": "$_id",
"result": "$result",
"status" : "$status",
"courseid": "$courseid",
"taskid": "$taskid",
"submitted_on": "$submitted_on"
}},
}},
{"$project": {
"submitted_on": 1,
"submissions": {
# This could be replaced by $filter if mongo v3.2 is set as dependency
"$setDifference": [
{"$map": {
"input": "$submissions",
"as": "submission",
"in": {
"$cond": [{"$eq": ["$submitted_on", "$$submission.submitted_on"]}, "$$submission", False]
}
}},
[False]
]
}
}},
{"$sort": {"submitted_on": pymongo.DESCENDING}},
{"$limit": limit}
])
return [item["submissions"][0] for item in data]
[docs] def get_gridfs(self):
""" Returns the GridFS used by the submission manager """
return self._gridfs
[docs] def get_submission_archive(self, submissions, sub_folders, aggregations, archive_file=None):
"""
:param submissions: a list of submissions
:param sub_folders: possible values:
[]: put all submissions in /
['taskid']: put all submissions for each task in a different directory /taskid/
['username']: put all submissions for each user in a different directory /username/
['taskid','username']: /taskid/username/
['username','taskid']: /username/taskid/
:return: a file-like object containing a tgz archive of all the submissions
"""
tmpfile = archive_file if archive_file is not None else tempfile.TemporaryFile()
tar = tarfile.open(fileobj=tmpfile, mode='w:gz')
for submission in submissions:
submission = self.get_input_from_submission(submission)
submission_yaml = io.BytesIO(inginious.common.custom_yaml.dump(submission).encode('utf-8'))
# Considering multiple single submissions for each user
for username in submission["username"]:
# Compute base path in the tar file
base_path = "/"
for sub_folder in sub_folders:
if sub_folder == 'taskid':
base_path = submission['taskid'] + base_path
elif sub_folder == 'username':
base_path = '_' + '-'.join(submission['username']) + base_path
base_path = base_path[1:]
elif sub_folder == 'aggregation':
if username in aggregations:
if aggregations[username] is None:
# If classrooms are not used, and user is not grouped, his classroom is replaced by None
base_path = '_' + '-'.join(submission['username']) + base_path
base_path = base_path[1:]
else:
base_path = (aggregations[username]["description"] +
" (" + str(aggregations[username]["_id"]) + ")").replace(" ", "_") + base_path
base_path = '/' + base_path
base_path = base_path[1:]
submission_yaml_fname = base_path + str(submission["_id"]) + '/submission.test'
# Avoid putting two times the same submission on the same place
if submission_yaml_fname not in tar.getnames():
info = tarfile.TarInfo(name=submission_yaml_fname)
info.size = submission_yaml.getbuffer().nbytes
info.mtime = time.mktime(submission["submitted_on"].timetuple())
# Add file in tar archive
tar.addfile(info, fileobj=submission_yaml)
# If there is an archive, add it too
if 'archive' in submission and submission['archive'] is not None and submission['archive'] != "":
subfile = self._gridfs.get(submission['archive'])
subtar = tarfile.open(fileobj=subfile, mode="r:gz")
for member in subtar.getmembers():
subtarfile = subtar.extractfile(member)
member.name = base_path + str(submission["_id"]) + "/archive/" + member.name
tar.addfile(member, subtarfile)
subtar.close()
subfile.close()
# If there files that were uploaded by the student, add them
if submission['input'] is not None:
for pid, problem in submission['input'].items():
# If problem is a dict, it is a file (from the specification of the problems)
if isinstance(problem, dict):
# Get the extension (match extensions with more than one dot too)
DOUBLE_EXTENSIONS = ['.tar.gz', '.tar.bz2', '.tar.bz', '.tar.xz']
ext = ""
if not problem['filename'].endswith(tuple(DOUBLE_EXTENSIONS)):
_, ext = os.path.splitext(problem['filename'])
else:
for t_ext in DOUBLE_EXTENSIONS:
if problem['filename'].endswith(t_ext):
ext = t_ext
subfile = io.BytesIO(problem['value'])
taskfname = base_path + str(submission["_id"]) + '/uploaded_files/' + pid + ext
# Generate file info
info = tarfile.TarInfo(name=taskfname)
info.size = subfile.getbuffer().nbytes
info.mtime = time.mktime(submission["submitted_on"].timetuple())
# Add file in tar archive
tar.addfile(info, fileobj=subfile)
# Close tarfile and put tempfile cursor at 0
tar.close()
tmpfile.seek(0)
return tmpfile
def _handle_ssh_callback(self, submission_id, host, port, password):
""" Handles the creation of a remote ssh server """
if host is not None: # ignore late calls (a bit hacky, but...)
obj = {
"ssh_host": host,
"ssh_port": port,
"ssh_password": password
}
self._database.submissions.update_one({"_id": submission_id}, {"$set": obj})
[docs] def get_job_queue_snapshot(self):
""" Get a snapshot of the remote backend job queue. May be a cached version.
May not contain recent jobs. May return None if no snapshot is available
Return a tuple of two lists (None, None):
jobs_running: a list of tuples in the form
(job_id, is_current_client_job, info, launcher, started_at, max_end)
where
- job_id is a job id. It may be from another client.
- is_current_client_job is a boolean indicating if the client that asked the request has started the job
- agent_name is the agent name
- info is "courseid/taskid"
- launcher is the name of the launcher, which may be anything
- started_at the time (in seconds since UNIX epoch) at which the job started
- max_end the time at which the job will timeout (in seconds since UNIX epoch), or -1 if no timeout is set
jobs_waiting: a list of tuples in the form
(job_id, is_current_client_job, info, launcher, max_time)
where
- job_id is a job id. It may be from another client.
- is_current_client_job is a boolean indicating if the client that asked the request has started the job
- info is "courseid/taskid"
- launcher is the name of the launcher, which may be anything
- max_time the maximum time that can be used, or -1 if no timeout is set
"""
return self._client.get_job_queue_snapshot()
[docs] def get_job_queue_info(self, jobid):
"""
:param jobid: the JOB id (not the submission id!). You should retrieve it before calling this function by calling get_submission(...)[
"job_id"].
:return: If the submission is in the queue, then returns a tuple (nb tasks before running (or -1 if running), approx wait time in seconds)
Else, returns None
"""
return self._client.get_job_queue_info(jobid)
[docs]def update_pending_jobs(database):
""" Updates pending jobs status in the database """
# Updates the submissions that are waiting with the status error, as the server restarted
database.submissions.update({'status': 'waiting'},
{"$unset": {'jobid': ""},
"$set": {'status': 'error', 'grade': 0.0, 'text': 'Internal error. Server restarted'}}, multi=True)