Source code for inginious.agent.mcq_agent

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
import json
import logging
import gettext

from inginious.agent import Agent, CannotCreateJobException
from inginious import get_root_path
from inginious.common.messages import BackendNewJob, BackendKillJob
import os.path
import builtins


[docs]class MCQAgent(Agent): def __init__(self, context, backend_addr, friendly_name, concurrency, tasks_filesystem, problem_types): """ :param context: ZeroMQ context for this process :param backend_addr: address of the backend (for example, "tcp://127.0.0.1:2222") :param friendly_name: a string containing a friendly name to identify agent :param tasks_filesystem: FileSystemProvider to the taskset/tasks :param problem_types: Problem types dictionary """ super().__init__(context, backend_addr, friendly_name, concurrency, tasks_filesystem) self._logger = logging.getLogger("inginious.agent.mcq") self._problem_types = problem_types # Init gettext self._translations = {"en": gettext.NullTranslations()} available_translations = [x for x in os.listdir(get_root_path() + '/agent/mcq_agent/i18n') if os.path.isdir(os.path.join(get_root_path() + '/agent/mcq_agent/i18n', x))] self._translations.update({ lang: gettext.translation('messages', get_root_path() + '/agent/mcq_agent/i18n', [lang]) for lang in available_translations }) @property def environments(self): return {"mcq": {"mcq": {"id": "mcq", "created": 0}}}
[docs] def check_answer(self, problems, task_input, language): """ Verify the answers in task_input. Returns six values: 1. True the input is **currently** valid. (may become invalid after running the code), False else 2. True if the input needs to be run in the VM, False else 3. Main message, as a list (that can be join with ``\\n`` or ``<br/>`` for example) 4. Problem specific message, as a dictionnary (tuple of result/text) 5. Number of subproblems that (already) contain errors. <= Number of subproblems 6. Number of errors in MCQ problems. Not linked to the number of subproblems """ valid = True need_launch = False main_message = [] problem_messages = {} error_count = 0 multiple_choice_error_count = 0 states = {} for problem in problems: problem_is_valid, problem_main_message, problem_s_messages, problem_mc_error_count, state = problem.check_answer(task_input, language) states[problem.get_id()] = state if problem_is_valid is None: need_launch = True elif problem_is_valid == False: error_count += 1 valid = False if problem_main_message is not None: main_message.append(problem_main_message) if problem_s_messages is not None: problem_messages[problem.get_id()] = (("success" if problem_is_valid else "failed"), problem_s_messages) multiple_choice_error_count += problem_mc_error_count return valid, need_launch, main_message, problem_messages, error_count, multiple_choice_error_count, json.dumps(states)
[docs] async def new_job(self, msg: BackendNewJob): language = msg.inputdata.get("@lang", "") previous_state = msg.inputdata.get("@state", "") translation = self._translations.get(language, gettext.NullTranslations()) # TODO: this would probably require a refactor. # This may pose problem with apps that start multiple MCQAgents in the same process... builtins.__dict__['_'] = translation.gettext taskset_fs = self._fs.from_subfolder(msg.taskset_id) task_fs = taskset_fs.from_subfolder(msg.task_id) translations_fs = task_fs.from_subfolder("$i18n") if not translations_fs.exists(): translations_fs = task_fs.from_subfolder("student").from_subfolder("$i18n") if not translations_fs.exists(): translations_fs = taskset_fs.from_subfolder("$common").from_subfolder("$i18n") if not translations_fs.exists(): translations_fs = taskset_fs.from_subfolder("$common").from_subfolder("student")\ .from_subfolder("$i18n") if translations_fs.exists() and translations_fs.exists(language + ".mo"): translations = {language: gettext.GNUTranslations(translations_fs.get_fd(language + ".mo"))} else: translations = {language: gettext.NullTranslations()} task_problems= msg.task_problems problems = [] for problemid, problem_content in task_problems.items(): problem_class = self._problem_types.get(problem_content.get('type', "")) problems.append(problem_class(problemid, problem_content, translations, task_fs)) result, need_emul, text, problems, error_count, mcq_error_count, state = self.check_answer(problems, msg.inputdata, language) internal_messages = { "_wrong_answer_multiple": _("Wrong answer. Make sure to select all the valid possibilities"), "_wrong_answer": _("Wrong answer"), "_correct_answer": _("Correct answer"), } for key, (p_result, messages) in problems.items(): messages = [internal_messages[message] if message in internal_messages else message for message in messages] problems[key] = (p_result, "\n\n".join(messages)) if need_emul: self._logger.warning("Task %s/%s is not a pure MCQ but has env=MCQ", msg.taskset_id, msg.task_id) raise CannotCreateJobException("Task wrongly configured as a MCQ") if error_count != 0: text.append(_("You have {} wrong answer(s).").format(error_count)) if mcq_error_count != 0: text.append("\n\n" + _("Among them, you have {} invalid answers in the multiple choice questions").format(mcq_error_count)) nb_subproblems = len(task_problems) if nb_subproblems == 0: grade = 0.0 text.append("No subproblems defined") await self.send_job_result(msg.job_id, "crashed", "\n".join(text), grade, problems, {}, {}, previous_state, None) else: grade = 100.0 * float(nb_subproblems - error_count) / float(nb_subproblems) await self.send_job_result(msg.job_id, ("success" if result else "failed"), "\n".join(text), grade, problems, {}, {}, state, None)
[docs] async def kill_job(self, message: BackendKillJob): pass