Source code for inginious.agent.docker_agent

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.

import asyncio
import base64
import logging
import os
import shutil
import struct
import tempfile
from os.path import join as path_join

import msgpack
import psutil
from inginious.agent.docker_agent._docker_interface import DockerInterface

from inginious.agent import Agent, CannotCreateJobException
from inginious.agent.docker_agent._timeout_watcher import TimeoutWatcher
from inginious.common.asyncio_utils import AsyncIteratorWrapper, AsyncProxy
from inginious.common.base import id_checker, id_checker_tests
from inginious.common.filesystems.provider import FileSystemProvider
from inginious.common.messages import BackendNewJob, BackendKillJob


[docs]class DockerAgent(Agent): def __init__(self, context, backend_addr, friendly_name, concurrency, tasks_fs: FileSystemProvider, ssh_host=None, ssh_ports=None, tmp_dir="./agent_tmp"): """ :param context: ZeroMQ context for this process :param backend_addr: address of the backend (for example, "tcp://127.0.0.1:2222") :param friendly_name: a string containing a friendly name to identify agent :param concurrency: number of simultaneous jobs that can be run by this agent :param tasks_fs: FileSystemProvider for the course / tasks :param ssh_host: hostname/ip/... to which external client should connect to access to an ssh remote debug session :param ssh_ports: iterable containing ports to which the docker instance can assign ssh servers (for remote debugging) :param tmp_dir: temp dir that is used by the agent to start new containers """ super(DockerAgent, self).__init__(context, backend_addr, friendly_name, concurrency, tasks_fs) self._logger = logging.getLogger("inginious.agent.docker") self._max_memory_per_slot = int(psutil.virtual_memory().total / concurrency / 1024 / 1024) self.tasks_fs = tasks_fs # Temp dir self._tmp_dir = tmp_dir # SSH remote debug self._ssh_host = ssh_host self._ssh_ports = set(ssh_ports) if ssh_ports is not None else set() # Async proxy to os self._aos = AsyncProxy(os) self._ashutil = AsyncProxy(shutil) async def _init_clean(self): """ Must be called when the agent is starting """ # Data about running containers self._containers_running = {} self._container_for_job = {} self._student_containers_running = {} self._student_containers_for_job = {} self._containers_killed = dict() # Delete tmp_dir, and recreate-it again try: await self._ashutil.rmtree(self._tmp_dir) except OSError: pass try: await self._aos.mkdir(self._tmp_dir) except OSError: pass # Docker self._docker = AsyncProxy(DockerInterface()) # Auto discover containers self._logger.info("Discovering containers") self._containers = await self._docker.get_containers() self._running_ssh_debug = {} # container_id : ssh_port if self._ssh_host is None and len(self._containers) != 0: self._logger.info("Guessing external host IP") self._ssh_host = await self._docker.get_host_ip(next(iter(self._containers.values()))["id"]) if self._ssh_host is None: self._logger.warning( "Cannot find external host IP. Please indicate it in the configuration. Remote SSH debug has been deactivated.") self._ssh_ports = None else: self._logger.info("External address for SSH remote debug is %s", self._ssh_host) # Watchers self._timeout_watcher = TimeoutWatcher(self._docker) async def _end_clean(self): """ Must be called when the agent is closing """ await self._timeout_watcher.clean() async def close_and_delete(container_id): try: await self._docker.remove_container(container_id) except: pass for container_id in self._containers_running: await close_and_delete(container_id) for container_id in self._student_containers_running: await close_and_delete(container_id) @property def environments(self): return self._containers async def _watch_docker_events(self): """ Get raw docker events and convert them to more readable objects, and then give them to self._docker_events_subscriber """ try: source = AsyncIteratorWrapper(self._docker.sync.event_stream(filters={"event": ["die", "oom"]})) async for i in source: if i["Type"] == "container" and i["status"] == "die": container_id = i["id"] try: retval = int(i["Actor"]["Attributes"]["exitCode"]) except asyncio.CancelledError: raise except: self._logger.exception("Cannot parse exitCode for container %s", container_id) retval = -1 if container_id in self._containers_running: self._create_safe_task(self.handle_job_closing(container_id, retval)) elif container_id in self._student_containers_running: self._create_safe_task(self.handle_student_job_closing(container_id, retval)) elif i["Type"] == "container" and i["status"] == "oom": container_id = i["id"] if container_id in self._containers_running or container_id in self._student_containers_running: self._logger.info("Container %s did OOM, killing it", container_id) self._containers_killed[container_id] = "overflow" try: self._create_safe_task(self._docker.kill_container(container_id)) except asyncio.CancelledError: raise except: # this call can sometimes fail, and that is normal. pass else: raise TypeError(str(i)) except asyncio.CancelledError: pass except: self._logger.exception("Exception in _watch_docker_events") def __new_job_sync(self, message, future_results): """ Synchronous part of _new_job. Creates needed directories, copy files, and starts the container. """ course_id = message.course_id task_id = message.task_id debug = message.debug environment_name = message.environment enable_network = message.enable_network time_limit = message.time_limit hard_time_limit = message.hard_time_limit or time_limit * 3 mem_limit = message.mem_limit course_fs = self.tasks_fs.from_subfolder(course_id) task_fs = course_fs.from_subfolder(task_id) if not course_fs.exists() or not task_fs.exists(): self._logger.warning("Task %s/%s unavailable on this agent", course_id, task_id) raise CannotCreateJobException('Task unavailable on agent. Please retry later, the agents should synchronize soon. ' 'If the error persists, please contact your course administrator.') # Check for realistic memory limit value if mem_limit < 20: mem_limit = 20 elif mem_limit > self._max_memory_per_slot: self._logger.warning("Task %s/%s ask for too much memory (%dMB)! Available: %dMB", course_id, task_id, mem_limit, self._max_memory_per_slot) raise CannotCreateJobException('Not enough memory on agent (available: %dMB). Please contact your course administrator.' % self._max_memory_per_slot) if environment_name not in self._containers: self._logger.warning("Task %s/%s ask for an unknown environment %s (not in aliases)", course_id, task_id, environment_name) raise CannotCreateJobException('Unknown container. Please contact your course administrator.') environment = self._containers[environment_name]["id"] # Handle ssh debugging ssh_port = None if debug == "ssh": # allow 30 minutes of real time. time_limit = 30 * 60 hard_time_limit = 30 * 60 # select a port if len(self._ssh_ports) == 0: self._logger.warning("User asked for an ssh debug but no ports are available") raise CannotCreateJobException('No ports are available for SSH debug right now. Please retry later.') ssh_port = self._ssh_ports.pop() # Create directories for storing all the data for the job try: container_path = tempfile.mkdtemp(dir=self._tmp_dir) except Exception as e: self._logger.error("Cannot make container temp directory! %s", str(e), exc_info=True) if ssh_port is not None: self._ssh_ports.add(ssh_port) raise CannotCreateJobException('Cannot make container temp directory.') task_path = path_join(container_path, 'task') # tmp_dir/id/task/ course_path = path_join(container_path, 'course') sockets_path = path_join(container_path, 'sockets') # tmp_dir/id/socket/ student_path = path_join(task_path, 'student') # tmp_dir/id/task/student/ systemfiles_path = path_join(task_path, 'systemfiles') # tmp_dir/id/task/systemfiles/ course_common_path = path_join(course_path, 'common') course_common_student_path = path_join(course_path, 'common', 'student') # Create the needed directories os.mkdir(sockets_path) os.chmod(container_path, 0o777) os.chmod(sockets_path, 0o777) os.mkdir(course_path) # TODO: avoid copy task_fs.copy_from(None, task_path) os.chmod(task_path, 0o777) if not os.path.exists(student_path): os.mkdir(student_path) os.chmod(student_path, 0o777) # Copy common and common/student if needed # TODO: avoid copy if course_fs.from_subfolder("$common").exists(): course_fs.from_subfolder("$common").copy_from(None, course_common_path) else: os.mkdir(course_common_path) if course_fs.from_subfolder("$common").from_subfolder("student").exists(): course_fs.from_subfolder("$common").from_subfolder("student").copy_from(None, course_common_student_path) else: os.mkdir(course_common_student_path) # Run the container try: container_id = self._docker.sync.create_container(environment, enable_network, mem_limit, task_path, sockets_path, course_common_path, course_common_student_path, ssh_port) except Exception as e: self._logger.warning("Cannot create container! %s", str(e), exc_info=True) shutil.rmtree(container_path) if ssh_port is not None: self._ssh_ports.add(ssh_port) raise CannotCreateJobException('Cannot create container.') # Store info self._containers_running[container_id] = message, container_path, future_results self._container_for_job[message.job_id] = container_id self._student_containers_for_job[message.job_id] = set() if ssh_port is not None: self._running_ssh_debug[container_id] = ssh_port try: # Start the container self._docker.sync.start_container(container_id) except Exception as e: self._logger.warning("Cannot start container! %s", str(e), exc_info=True) shutil.rmtree(container_path) if ssh_port is not None: self._ssh_ports.add(ssh_port) raise CannotCreateJobException('Cannot start container') return { "job_id": message.job_id, "container_id": container_id, "inputdata": message.inputdata, "debug": debug, "ssh_port": ssh_port, "orig_env": environment_name, "orig_memory_limit": mem_limit, "orig_time_limit": time_limit, "orig_hard_time_limit": hard_time_limit, "sockets_path": sockets_path, "student_path": student_path, "systemfiles_path": systemfiles_path, "course_common_student_path": course_common_student_path }
[docs] async def new_job(self, message: BackendNewJob): """ Handles a new job: starts the grading container """ self._logger.info("Received request for jobid %s", message.job_id) future_results = asyncio.Future() out = await self._loop.run_in_executor(None, lambda: self.__new_job_sync(message, future_results)) self._create_safe_task(self.handle_running_container(**out, future_results=future_results)) await self._timeout_watcher.register_container(out["container_id"], out["orig_time_limit"], out["orig_hard_time_limit"])
[docs] async def create_student_container(self, job_id, parent_container_id, sockets_path, student_path, systemfiles_path, course_common_student_path, socket_id, environment_name, memory_limit, time_limit, hard_time_limit, share_network, write_stream): """ Creates a new student container. :param write_stream: stream on which to write the return value of the container (with a correctly formatted msgpack message) """ try: self._logger.debug("Starting new student container... %s %s %s %s", environment_name, memory_limit, time_limit, hard_time_limit) if environment_name not in self._containers: self._logger.warning("Student container asked for an unknown environment %s (not in aliases)", environment_name) await self._write_to_container_stdin(write_stream, {"type": "run_student_retval", "retval": 254, "socket_id": socket_id}) return environment = self._containers[environment_name]["id"] try: socket_path = path_join(sockets_path, str(socket_id) + ".sock") container_id = await self._docker.create_container_student(parent_container_id, environment, share_network, memory_limit, student_path, socket_path, systemfiles_path, course_common_student_path) except Exception as e: self._logger.exception("Cannot create student container!") await self._write_to_container_stdin(write_stream, {"type": "run_student_retval", "retval": 254, "socket_id": socket_id}) if isinstance(e, asyncio.CancelledError): raise return self._student_containers_for_job[job_id].add(container_id) self._student_containers_running[container_id] = job_id, parent_container_id, socket_id, write_stream # send to the container that the sibling has started await self._write_to_container_stdin(write_stream, {"type": "run_student_started", "socket_id": socket_id}) try: await self._docker.start_container(container_id) except Exception as e: self._logger.exception("Cannot start student container!") await self._write_to_container_stdin(write_stream, {"type": "run_student_retval", "retval": 254, "socket_id": socket_id}) if isinstance(e, asyncio.CancelledError): raise return # Verify the time limit await self._timeout_watcher.register_container(container_id, time_limit, hard_time_limit) except asyncio.CancelledError: raise except: self._logger.exception("Exception in create_student_container")
async def _write_to_container_stdin(self, write_stream, message): """ Send a message to the stdin of a container, with the right data :param write_stream: asyncio write stream to the stdin of the container :param message: dict to be msgpacked and sent """ msg = msgpack.dumps(message, encoding="utf8", use_bin_type=True) self._logger.debug("Sending %i bytes to container", len(msg)) write_stream.write(struct.pack('I', len(msg))) write_stream.write(msg) await write_stream.drain()
[docs] async def handle_running_container(self, job_id, container_id, inputdata, debug, ssh_port, orig_env, orig_memory_limit, orig_time_limit, orig_hard_time_limit, sockets_path, student_path, systemfiles_path, course_common_student_path, future_results): """ Talk with a container. Sends the initial input. Allows to start student containers """ sock = await self._docker.attach_to_container(container_id) try: read_stream, write_stream = await asyncio.open_connection(sock=sock.get_socket()) except asyncio.CancelledError: raise except: self._logger.exception("Exception occurred while creating read/write stream to container") return None # Send hello msg await self._write_to_container_stdin(write_stream, {"type": "start", "input": inputdata, "debug": debug}) result = None buffer = bytearray() try: while not read_stream.at_eof(): msg_header = await read_stream.readexactly(8) outtype, length = struct.unpack_from('>BxxxL', msg_header) # format imposed by docker in the attach endpoint if length != 0: content = await read_stream.readexactly(length) if outtype == 1: # stdout buffer += content if outtype == 2: # stderr self._logger.debug("Received stderr from containers:\n%s", content) # 4 first bytes are the lenght of the message. If we have a complete message... while len(buffer) > 4 and len(buffer) >= 4+struct.unpack('I',buffer[0:4])[0]: msg_encoded = buffer[4:4 + struct.unpack('I', buffer[0:4])[0]] # ... get it buffer = buffer[4 + struct.unpack('I', buffer[0:4])[0]:] # ... withdraw it from the buffer try: msg = msgpack.unpackb(msg_encoded, encoding="utf8", use_list=False) self._logger.debug("Received msg %s from container %s", msg["type"], container_id) if msg["type"] == "run_student": # start a new student container environment = msg["environment"] or orig_env memory_limit = min(msg["memory_limit"] or orig_memory_limit, orig_memory_limit) time_limit = min(msg["time_limit"] or orig_time_limit, orig_time_limit) hard_time_limit = min(msg["hard_time_limit"] or orig_hard_time_limit, orig_hard_time_limit) share_network = msg["share_network"] socket_id = msg["socket_id"] assert "/" not in socket_id # ensure task creator do not try to break the agent :-( self._create_safe_task(self.create_student_container(job_id, container_id, sockets_path, student_path, systemfiles_path, course_common_student_path, socket_id, environment, memory_limit, time_limit, hard_time_limit, share_network, write_stream)) elif msg["type"] == "ssh_key": # send the data to the backend (and client) self._logger.info("%s %s", self._running_ssh_debug[container_id], str(msg)) await self.send_ssh_job_info(job_id, self._ssh_host, ssh_port, msg["ssh_key"]) elif msg["type"] == "result": # last message containing the results of the container result = msg["result"] except: self._logger.exception("Received incorrect message from container %s (job id %s)", container_id, job_id) except asyncio.IncompleteReadError: self._logger.debug("Container output ended with an IncompleteReadError; It was probably killed.") except asyncio.CancelledError: write_stream.close() sock.close_socket() future_results.set_result(result) raise except: self._logger.exception("Exception while reading container %s output", container_id) write_stream.close() sock.close_socket() future_results.set_result(result) if not result: self._logger.warning("Container %s has not given any result", container_id)
[docs] async def handle_student_job_closing(self, container_id, retval): """ Handle a closing student container. Do some cleaning, verify memory limits, timeouts, ... and returns data to the associated grading container """ try: self._logger.debug("Closing student %s", container_id) try: job_id, parent_container_id, socket_id, write_stream = self._student_containers_running[container_id] del self._student_containers_running[container_id] except asyncio.CancelledError: raise except: self._logger.warning("Student container %s that has finished(p1) was not launched by this agent", str(container_id), exc_info=True) return # Delete remaining student containers if job_id in self._student_containers_for_job: # if it does not exists, then the parent container has closed self._student_containers_for_job[job_id].remove(container_id) killed = await self._timeout_watcher.was_killed(container_id) if container_id in self._containers_killed: killed = self._containers_killed[container_id] del self._containers_killed[container_id] if killed == "timeout": retval = 253 elif killed == "overflow": retval = 252 try: await self._write_to_container_stdin(write_stream, {"type": "run_student_retval", "retval": retval, "socket_id": socket_id}) except asyncio.CancelledError: raise except: pass # parent container closed # Do not forget to remove the container try: await self._docker.remove_container(container_id) except asyncio.CancelledError: raise except: pass # ignore except asyncio.CancelledError: raise except: self._logger.exception("Exception in handle_student_job_closing")
[docs] async def handle_job_closing(self, container_id, retval): """ Handle a closing student container. Do some cleaning, verify memory limits, timeouts, ... and returns data to the backend """ try: self._logger.debug("Closing %s", container_id) try: message, container_path, future_results = self._containers_running[container_id] del self._containers_running[container_id] except asyncio.CancelledError: raise except: self._logger.warning("Container %s that has finished(p1) was not launched by this agent", str(container_id), exc_info=True) return # Close sub containers for student_container_id_loop in self._student_containers_for_job[message.job_id]: # little hack to ensure the value of student_container_id_loop is copied into the closure async def close_and_delete(student_container_id=student_container_id_loop): try: await self._docker.kill_container(student_container_id) await self._docker.remove_container(student_container_id) except asyncio.CancelledError: raise except: pass # ignore self._create_safe_task(close_and_delete(student_container_id_loop)) del self._student_containers_for_job[message.job_id] # Allow other container to reuse the ssh port this container has finished to use if container_id in self._running_ssh_debug: self._ssh_ports.add(self._running_ssh_debug[container_id]) del self._running_ssh_debug[container_id] # Verify if the container was killed, either by the client, by an OOM or by a timeout killed = await self._timeout_watcher.was_killed(container_id) if container_id in self._containers_killed: killed = self._containers_killed[container_id] if container_id in self._running_ssh_debug: self._ssh_ports.add(self._running_ssh_debug[container_id]) del self._containers_killed[container_id] stdout = "" stderr = "" result = "crash" if retval == -1 else None error_msg = None grade = None problems = {} custom = {} tests = {} archive = None if killed is not None: result = killed # If everything did well, continue to retrieve the status from the container if result is None: # Get logs back try: return_value = await future_results # Accepted types for return dict accepted_types = {"stdout": str, "stderr": str, "result": str, "text": str, "grade": float, "problems": dict, "custom": dict, "tests": dict, "archive": str} keys_fct = {"problems": id_checker, "custom": id_checker, "tests": id_checker_tests} # Check dict content for key, item in return_value.items(): if not isinstance(item, accepted_types[key]): raise Exception("Feedback file is badly formatted.") elif accepted_types[key] == dict and key != "custom": #custom can contain anything: for sub_key, sub_item in item.items(): if not keys_fct[key](sub_key) or isinstance(sub_item, dict): raise Exception("Feedback file is badly formatted.") # Set output fields stdout = return_value.get("stdout", "") stderr = return_value.get("stderr", "") result = return_value.get("result", "error") error_msg = return_value.get("text", "") grade = return_value.get("grade", None) problems = return_value.get("problems", {}) custom = return_value.get("custom", {}) tests = return_value.get("tests", {}) archive = return_value.get("archive", None) if archive is not None: archive = base64.b64decode(archive) except Exception as e: self._logger.exception("Cannot get back output of container %s! (%s)", container_id, str(e)) result = "crash" error_msg = 'The grader did not return a readable output : {}'.format(str(e)) # Default values if error_msg is None: error_msg = "" if grade is None: if result == "success": grade = 100.0 else: grade = 0.0 # Remove container try: await self._docker.remove_container(container_id) except asyncio.CancelledError: raise except: pass # Delete folders try: await self._ashutil.rmtree(container_path) except PermissionError: self._logger.debug("Cannot remove old container path!") pass # todo: run a docker container to force removal # Return! await self.send_job_result(message.job_id, result, error_msg, grade, problems, tests, custom, archive, stdout, stderr) # Do not forget to remove data from internal state del self._container_for_job[message.job_id] except asyncio.CancelledError: raise except: self._logger.exception("Exception in handle_job_closing")
[docs] async def kill_job(self, message: BackendKillJob): """ Handles `kill` messages. Kill things. """ try: if message.job_id in self._container_for_job: self._containers_killed[self._container_for_job[message.job_id]] = "killed" await self._docker.kill_container(self._container_for_job[message.job_id]) else: self._logger.warning("Cannot kill container for job %s because it is not running", str(message.job_id)) except asyncio.CancelledError: raise except: self._logger.exception("Exception in handle_kill_job")
[docs] async def run(self): await self._init_clean() # Init Docker events watcher watcher_docker_event = self._create_safe_task(self._watch_docker_events()) try: await super(DockerAgent, self).run() except: await self._end_clean() raise