Source code for inginious.frontend.pages.course_admin.student_list

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
import io
import csv
import json

import yaml

import flask
from collections import OrderedDict
from bson import ObjectId
from pymongo import ReturnDocument
from flask import Response
from io import StringIO
from inginious.common import custom_yaml
from inginious.frontend.pages.course_admin.utils import make_csv, INGIniousAdminPage


[docs]class CourseStudentListPage(INGIniousAdminPage): """ Course administration page: list of registered students """
[docs] def GET_AUTH(self, courseid): # pylint: disable=arguments-differ """ GET request """ course, __ = self.get_course_and_check_rights(courseid) if "preferred_field" in flask.request.args and flask.request.args["preferred_field"] in \ ['username', 'email']: preferred_field = flask.request.args["preferred_field"] audiences = [] si = StringIO() cw = csv.writer(si) for audience in self.user_manager.get_course_audiences(course): for student in audience["students"]: field_value = self.get_requested_field_user_info(student, preferred_field) audiences.append([field_value, preferred_field, "student", audience["description"]]) for tutor in audience["tutors"]: field_value = self.get_requested_field_user_info(tutor, preferred_field) audiences.append([field_value, preferred_field, "tutor", audience["description"]]) cw.writerows(audiences) response = Response(response=si.getvalue(), content_type='text/csv') response.headers['Content-Disposition'] = 'attachment; filename="audiences.csv"' return response if "download_groups" in flask.request.args: groups = [{"description": group["description"], "students": group["students"], "size": group["size"], "audiences": [str(c) for c in group["audiences"]]} for group in self.user_manager.get_course_groups(course)] response = Response(response=yaml.dump(groups), content_type='text/x-yaml') response.headers['Content-Disposition'] = 'attachment; filename="groups.yaml"' return response return self.page(course, active_tab="tab_audiences" if "audiences" in flask.request.args else "tab_students")
[docs] def POST_AUTH(self, courseid): # pylint: disable=arguments-differ """ POST request """ course, __ = self.get_course_and_check_rights(courseid) data = flask.request.form.copy() data["delete"] = flask.request.form.getlist("delete") data["groupfile"] = flask.request.files.get("groupfile") data["audiencefile"] = flask.request.files.get("audiencefile") error = {} msg = {} active_tab = "tab_students" self.post_student_list(course, data) active_tab = self.post_audiences(course, data, active_tab, msg, error) active_tab = self.post_groups(course, data, active_tab, msg, error) return self.page(course, active_tab, msg, error)
[docs] def submission_url_generator_user(self, username): """ Generates a submission url """ return "?format=taskid%2Fusername&users=" + username
[docs] def submission_url_generator_audience(self, audienceid): """ Generates a submission url """ return "?audiences=" + str(audienceid)
[docs] def page(self, course, active_tab="tab_students", msg=None, error=None): """ Get all data and display the page """ if error is None: error = {} if msg is None: msg = {} split_audiences, audiences = self.get_audiences_params(course) user_data = self.get_student_list_params(course) groups = self.user_manager.get_course_groups(course) student_list, audience_list, other_students, users_info = self.get_user_lists(course) if "csv_audiences" in flask.request.args: return make_csv(audiences) if "csv_student" in flask.request.args: return make_csv(user_data) return self.template_helper.render("course_admin/student_list.html", course=course, user_data=list(user_data.values()), audiences=split_audiences, active_tab=active_tab, student_list=student_list, audience_list=audience_list, other_students=other_students, users_info=users_info, groups=groups, error=error, msg=msg)
[docs] def get_student_list_params(self, course): users = sorted(list( self.user_manager.get_users_info(self.user_manager.get_course_registered_users(course, False)).items()), key=lambda k: k[1].realname if k[1] is not None else "") users = OrderedDict(sorted(list(self.user_manager.get_users_info(course.get_admins()).items()), key=lambda k: k[1].realname if k[1] is not None else "") + users) user_data = OrderedDict([(username, { "username": username, "realname": user.realname if user is not None else "", "email": user.email if user is not None else "", "total_tasks": 0, "task_grades": {"answer": 0, "match": 0}, "task_succeeded": 0, "task_tried": 0, "total_tries": 0, "grade": 0, "url": self.submission_url_generator_user(username)}) for username, user in users.items()]) for username, data in self.user_manager.get_course_caches(list(users.keys()), course).items(): user_data[username].update(data if data is not None else {}) return user_data
[docs] def get_audiences_params(self, course): audiences = OrderedDict() taskids = list(course.get_tasks().keys()) for audience in self.user_manager.get_course_audiences(course): audiences[audience['_id']] = dict(list(audience.items()) + [("tried", 0), ("done", 0), ("url", self.submission_url_generator_audience(audience['_id'])) ]) data = list(self.database.submissions.aggregate( [ { "$match": { "courseid": course.get_id(), "taskid": {"$in": taskids}, "username": {"$in": audience["students"]} } }, { "$group": { "_id": "$taskid", "tried": {"$sum": 1}, "done": {"$sum": {"$cond": [{"$eq": ["$result", "success"]}, 1, 0]}} } }, ])) for c in data: audiences[audience['_id']]["tried"] += 1 if c["tried"] else 0 audiences[audience['_id']]["done"] += 1 if c["done"] else 0 my_audiences, other_audiences = [], [] for audience in audiences.values(): if self.user_manager.session_username() in audience["tutors"]: my_audiences.append(audience) else: other_audiences.append(audience) return [my_audiences, other_audiences], audiences
[docs] def post_student_list(self, course, data): if "remove_student" in data: try: if data["type"] == "all": audiences = list(self.database.audiences.find({"courseid": course.get_id()})) for audience in audiences: audience["students"] = [] self.database.audiences.replace_one({"_id": audience["_id"]}, audience) groups = list(self.database.groups.find({"courseid": course.get_id()})) for group in groups: group["students"] = [] self.database.groups.replace_one({"_id": group["_id"]}, group) self.database.courses.find_one_and_update({"_id": course.get_id()}, {"$set": {"students": []}}) else: self.user_manager.course_unregister_user(course.get_id(), data["username"]) except: pass elif "register_student" in data: try: self.user_manager.course_register_user(course, data["username"].strip(), '', True) except: pass
[docs] def post_audiences(self, course, data, active_tab, msg, error): try: if 'audience' in data: self.database.audiences.insert_one({"courseid": course.get_id(), "students": [], "tutors": [], "description": data['audience']}) msg["audiences"] = _("New audience created.") active_tab = "tab_audiences" except: msg["audiences"] = _('User returned an invalid form.') error["audiences"] = True active_tab = "tab_audiences" try: if "audiencefile" in data and 'upload_audiences_creation' in data: # get the Werkzeug datastructures.FileStorage object. # The stream of this object is the stream body of the uploaded file. # Furthermore, FileStorage.stream seems to inherit ʻio.BufferedIOBase`, so this stream should be boiled. # As reader return an iterator and that we iterate twice, it is faster to cast into a list. csv_data = list(csv.reader(io.TextIOWrapper(data["audiencefile"], encoding='utf-8'))) # Define used variables. students_per_audience = {} tutors_per_audience = {} course_students = [] course_tutors = [] audiences = [] # Check correctness of CSV structure. for line in csv_data: if len(line) != 4: msg["audiences"] = _("File wrongly formatted.") error["audiences"] = True if "audiences" not in error or not error["audiences"]: stud_list, aud_li, oth_stu, u_info = self.get_user_lists(course) courseid = course.get_id() # Fully remove previous audiences. self.database.audiences.delete_many({"courseid": courseid}) # read datas from CSV. for user_id, field, role, description in csv_data: user_id = user_id.strip() field = field.strip() role = role.strip() if description != "": description = description.strip() if field not in ["username", "email"]: msg["audiences"] = _("Field was not recognized: ") + field error["audiences"] = True continue if role not in ["student", "tutor"]: msg["audiences"] = _("Unknown role: ") + role error["audiences"] = True continue user = self.database.users.find_one({field: user_id}) if user is not None: user_id = user["username"] else: msg["audiences"] = _("User was not found: ") + user_id error["audiences"] = True continue # prepare datas to avoid multiple request to database. if role == "student": students_per_audience.setdefault(description, []).append(user_id) course_students.append(user_id) else: tutors_per_audience.setdefault(description, []).append(user_id) course_tutors.append(user_id) # Creation of audiences. if len(students_per_audience) > 0: for key, value in students_per_audience.items(): audiences.append({"description": key, "courseid": courseid, "students": value, "tutors": tutors_per_audience[key] if key in tutors_per_audience else []}) else: for key, value in tutors_per_audience.items(): audiences.append({"description": key, "courseid": courseid, "students": [], "tutors": value}) # update list of students and tutors of the course. new_students = list(set(stud_list).union(set(course_students))) new_tutors = list(set(course.get_admins()).union(set(course_tutors))) self.database.courses.update_one({"_id": courseid}, {"$set": {"students": new_students, "tutors": new_tutors}}) # this is done to avoid removing the audience id and impact the group audience filter. for audience in audiences: existing_audience = self.database.audiences.find_one( {"courseid": courseid, "description": audience["description"]}) if not existing_audience: self.database.audiences.insert_one(audience) else: self.database.audiences.update_one({"courseid": courseid, "description": audience["description"]}, {"$set": {"students": audience["students"], "tutors": audience["tutors"]}}) active_tab = "tab_audiences" except Exception as e: msg["audiences"] = _('An error occurred while parsing the data.') error["audiences"] = True active_tab = "tab_audiences" return active_tab
[docs] def get_requested_field_user_info(self, username, preferred_field): if preferred_field != "username": # query user username = self.database.users.find_one({"username": username})[preferred_field] return username
[docs] def post_groups(self, course, data, active_tab, msg, error): if course.is_lti(): return active_tab audience_list = self.user_manager.get_course_audiences(course) audience_students = {} for audience in audience_list: for stud in audience["students"]: audience_students.setdefault(stud, []).append(audience["_id"]) errored_students = [] if len(data["delete"]): for classid in data["delete"]: # Get the group group = self.database.groups.find_one( {"_id": ObjectId(classid), "courseid": course.get_id()}) if ObjectId.is_valid(classid) else None if group is None: msg["groups"] = ("group with id {} not found.").format(classid) error["groups"] = True else: self.database.groups.find_one_and_update({"courseid": course.get_id()}, {"$push": { "students": {"$each": group["students"]} }}) self.database.groups.delete_one({"_id": ObjectId(classid)}) msg["groups"] = _("Audience updated.") active_tab = "tab_groups" if "upload_groups" in data or "groups" in data: try: if "upload_groups" in data: self.database.groups.delete_many({"courseid": course.get_id()}) groups = custom_yaml.load(data["groupfile"].read()) else: groups = json.loads(data["groups"]) for index, new_group in enumerate(groups): # In case of file upload, no id specified new_group['_id'] = new_group['_id'] if '_id' in new_group else 'None' # Update the group group, errors = self.update_group(course, new_group['_id'], new_group, audience_students) errored_students += errors if len(errored_students) > 0: msg["groups"] = _("Changes couldn't be applied for following students :") + "<ul>" for student in errored_students: msg["groups"] += "<li>" + student + "</li>" msg["groups"] += "</ul>" error["groups"] = True elif not error: msg["groups"] = _("Groups updated.") except: msg["groups"] = _('An error occurred while parsing the data.') error["groups"] = True active_tab = "tab_groups" return active_tab
[docs] def get_user_lists(self, course): """ Get the available student list for group edition""" audience_list = self.user_manager.get_course_audiences(course) audience_list = {audience["_id"]: audience for audience in audience_list} student_list = self.user_manager.get_course_registered_users(course, False) users_info = self.user_manager.get_users_info(student_list) groups_list = list(self.database.groups.aggregate([ {"$match": {"courseid": course.get_id()}}, {"$unwind": "$students"}, {"$project": { "group": "$_id", "students": 1 }} ])) groups_list = {d["students"]: d["group"] for d in groups_list} other_students = [entry for entry in student_list if entry not in groups_list] other_students = sorted(other_students, key=lambda val: (("0" + users_info[val].realname) if users_info[val] else ("1" + val))) return student_list, audience_list, other_students, users_info
[docs] def update_group(self, course, groupid, new_data, audience_students): """ Update group and returns a list of errored students""" student_list = self.user_manager.get_course_registered_users(course, False) # If group is new if groupid == 'None': # Remove _id for correct insertion del new_data['_id'] new_data["courseid"] = course.get_id() # Insert the new group result = self.database.groups.insert_one(new_data) # Retrieve new group id groupid = result.inserted_id new_data['_id'] = result.inserted_id group = new_data else: group = self.database.groups.find_one({"_id": ObjectId(groupid), "courseid": course.get_id()}) # Convert audience ids to ObjectId new_data["audiences"] = [ObjectId(s) for s in new_data["audiences"]] students, errored_students = [], [] if len(new_data["students"]) <= new_data["size"]: # Check the students for student in new_data["students"]: student_allowed_in_group = any( set(audience_students.get(student, [])).intersection(new_data["audiences"])) if student in student_list and (student_allowed_in_group or not new_data["audiences"]): # Remove user from the other group self.database.groups.find_one_and_update({"courseid": course.get_id(), "students": student}, {"$pull": {"students": student}}) students.append(student) else: errored_students.append(student) new_data["students"] = students group = self.database.groups.find_one_and_update( {"_id": ObjectId(groupid)}, {"$set": {"description": new_data["description"], "audiences": new_data["audiences"], "size": new_data["size"], "students": students}}, return_document=ReturnDocument.AFTER) return group, errored_students
[docs] def update_audience(self, course, audienceid, new_data): """ Update audience and returns a list of errored students""" student_list = self.user_manager.get_course_registered_users(course, False) # If audience is new if audienceid == 'None': # Remove _id for correct insertion del new_data['_id'] new_data["courseid"] = course.get_id() # Insert the new audience result = self.database.audiences.insert_one(new_data) # Retrieve new audience id audienceid = result.inserted_id new_data['_id'] = result.inserted_id audience = new_data else: audience = self.database.audiences.find_one({"_id": ObjectId(audienceid), "courseid": course.get_id()}) # Check tutors new_data["tutors"] = [tutor for tutor in new_data["tutors"] if tutor in course.get_admins()] students, errored_students = [], [] # Check the students for student in new_data["students"]: if student in student_list: students.append(student) else: # Check if user can be registered user_info = self.user_manager.get_user_info(student) if user_info is None or student in audience["tutors"]: errored_students.append(student) else: self.user_manager.course_register_user(course, student, force=True) students.append(student) removed_students = [student for student in audience["students"] if student not in new_data["students"]] self.database.audiences.find_one_and_update({"courseid": course.get_id()}, {"$push": {"students": {"$each": removed_students}}}) new_data["students"] = students audience = self.database.audiences.find_one_and_update( {"_id": ObjectId(audienceid)}, {"$set": {"description": new_data["description"], "students": students, "tutors": new_data["tutors"]}}, return_document=ReturnDocument.AFTER) return audience, errored_students