# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
""" Utilities for administration pages """
import codecs
import csv
import io
from collections import OrderedDict
import web
from bson.objectid import ObjectId
from inginious.common.base import id_checker
from inginious.frontend.pages.utils import INGIniousAuthPage
[docs]class INGIniousAdminPage(INGIniousAuthPage):
"""
An improved version of INGIniousAuthPage that checks rights for the administration
"""
[docs] def get_course_and_check_rights(self, courseid, taskid=None, allow_all_staff=True):
""" Returns the course with id ``courseid`` and the task with id ``taskid``, and verify the rights of the user.
Raise web.notfound() when there is no such course of if the users has not enough rights.
:param courseid: the course on which to check rights
:param taskid: If not None, returns also the task with id ``taskid``
:param allow_all_staff: allow admins AND tutors to see the page. If false, all only admins.
:returns (Course, Task)
"""
try:
course = self.course_factory.get_course(courseid)
if allow_all_staff:
if not self.user_manager.has_staff_rights_on_course(course):
raise web.notfound()
else:
if not self.user_manager.has_admin_rights_on_course(course):
raise web.notfound()
if taskid is None:
return course, None
else:
return course, course.get_task(taskid)
except:
raise web.notfound()
[docs]class INGIniousSubmissionAdminPage(INGIniousAdminPage):
"""
An INGIniousAdminPage containing some common methods between download/replay pages
"""
def _validate_list(self, usernames):
""" Prevent MongoDB injections by verifying arrays sent to it """
for i in usernames:
if not id_checker(i):
raise web.notfound()
[docs] def get_selected_submissions(self, course, filter_type, selected_tasks, users, aggregations, stype):
"""
Returns the submissions that have been selected by the admin
:param course: course
:param filter_type: users or aggregations
:param selected_tasks: selected tasks id
:param users: selected usernames
:param aggregations: selected aggregations
:param stype: single or all submissions
:return:
"""
if filter_type == "users":
self._validate_list(users)
aggregations = list(self.database.aggregations.find({"courseid": course.get_id(),
"students": {"$in": users}}))
# Tweak if not using classrooms : classroom['students'] may content ungrouped users
aggregations = dict([(username,
aggregation if course.use_classrooms() or (
len(aggregation['groups']) and
username in aggregation['groups'][0]["students"]
) else None
) for aggregation in aggregations for username in users])
else:
self._validate_list(aggregations)
aggregations = list(
self.database.aggregations.find({"_id": {"$in": [ObjectId(cid) for cid in aggregations]}}))
# Tweak if not using classrooms : classroom['students'] may content ungrouped users
aggregations = dict([(username,
aggregation if course.use_classrooms() or (
len(aggregation['groups']) and
username in aggregation['groups'][0]["students"]
) else None
) for aggregation in aggregations for username in aggregation["students"]])
if stype == "single":
user_tasks = list(self.database.user_tasks.find({"username": {"$in": list(aggregations.keys())},
"taskid": {"$in": selected_tasks},
"courseid": course.get_id()}))
submissionsid = [user_task['submissionid'] for user_task in user_tasks if user_task['submissionid'] is not None]
submissions = list(self.database.submissions.find({"_id": {"$in": submissionsid}}))
else:
submissions = list(self.database.submissions.find({"username": {"$in": list(aggregations.keys())},
"taskid": {"$in": selected_tasks},
"courseid": course.get_id(),
"status": {"$in": ["done", "error"]}}))
return submissions, aggregations
[docs] def show_page_params(self, course, user_input):
tasks = sorted(list(course.get_tasks().items()), key=lambda task: (task[1].get_order(), task[1].get_id()))
user_list = self.user_manager.get_course_registered_users(course, False)
users = OrderedDict(sorted(list(self.user_manager.get_users_info(user_list).items()),
key=lambda k: k[1][0] if k[1] is not None else ""))
user_data = OrderedDict(
[(username, user[0] if user is not None else username) for username, user in users.items()])
aggregations = self.user_manager.get_course_aggregations(course)
tutored_aggregations = [str(aggregation["_id"]) for aggregation in aggregations if
self.user_manager.session_username() in aggregation["tutors"] and len(
aggregation['groups']) > 0]
tutored_users = []
for aggregation in aggregations:
for username in aggregation["students"]:
# If no classrooms used, only students inside groups
if self.user_manager.session_username() in aggregation["tutors"] and \
(course.use_classrooms() or
(len(aggregation['groups']) > 0 and username in aggregation['groups'][0]['students'])):
tutored_users += [username]
checked_tasks = list(course.get_tasks().keys())
checked_users = list(user_data.keys())
checked_aggregations = [aggregation['_id'] for aggregation in aggregations]
show_aggregations = False
if "tasks" in user_input:
checked_tasks = user_input.tasks.split(',')
if "users" in user_input:
checked_users = user_input.users.split(',')
if "aggregations" in user_input:
checked_aggregations = user_input.aggregations.split(',')
show_aggregations = True
if "tutored" in user_input:
if user_input.tutored == "aggregations":
checked_aggregations = tutored_aggregations
show_aggregations = True
elif user_input.tutored == "users":
checked_users = tutored_users
show_aggregations = True
for aggregation in aggregations:
aggregation['checked'] = str(aggregation['_id']) in checked_aggregations
return tasks, user_data, aggregations, tutored_aggregations, tutored_users, checked_tasks, checked_users, show_aggregations
[docs]class UnicodeWriter(object):
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = io.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
[docs] def writerow(self, row):
""" Writes a row to the CSV file """
self.writer.writerow(row)
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
self.queue.seek(0)
[docs] def writerows(self, rows):
""" Writes multiple rows to the CSV file """
for row in rows:
self.writerow(row)
[docs]def make_csv(data):
""" Returns the content of a CSV file with the data of the dict/list data """
# Convert sub-dicts to news cols
for entry in data:
rval = entry
if isinstance(data, dict):
rval = data[entry]
todel = []
toadd = {}
for key, val in rval.items():
if isinstance(val, dict):
for key2, val2 in val.items():
toadd[str(key) + "[" + str(key2) + "]"] = val2
todel.append(key)
for k in todel:
del rval[k]
for k, v in toadd.items():
rval[k] = v
# Convert everything to CSV
columns = set()
output = [[]]
if isinstance(data, dict):
output[0].append("id")
for entry in data:
for col in data[entry]:
columns.add(col)
else:
for entry in data:
for col in entry:
columns.add(col)
columns = sorted(columns)
for col in columns:
output[0].append(col)
if isinstance(data, dict):
for entry in data:
new_output = [str(entry)]
for col in columns:
new_output.append(str(data[entry][col]) if col in data[entry] else "")
output.append(new_output)
else:
for entry in data:
new_output = []
for col in columns:
new_output.append(str(entry[col]) if col in entry else "")
output.append(new_output)
csv_string = io.StringIO()
csv_writer = UnicodeWriter(csv_string)
for row in output:
csv_writer.writerow(row)
csv_string.seek(0)
web.header('Content-Type', 'text/csv; charset=utf-8')
web.header('Content-disposition', 'attachment; filename=export.csv')
return csv_string.read()
[docs]class CourseRedirect(INGIniousAdminPage):
""" Redirect admins to /settings and tutors to /task """
[docs] def GET_AUTH(self, courseid): # pylint: disable=arguments-differ
""" GET request """
course, __ = self.get_course_and_check_rights(courseid)
if self.user_manager.session_username() in course.get_tutors():
raise web.seeother(self.app.get_homepath() + '/admin/{}/tasks'.format(courseid))
else:
raise web.seeother(self.app.get_homepath() + '/admin/{}/settings'.format(courseid))
[docs] def POST_AUTH(self, courseid): # pylint: disable=arguments-differ
""" POST request """
return self.GET_AUTH(courseid)