Source code for inginious.common.message_meta

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
import inspect

import msgpack


[docs]class MessageMeta(type): """ A MetaClass for messages Provides message checking on both side of the communication. Each class depending from this MetaClass MUST have a __init__ function that takes only arguments that are type-hinted, and that ONLY assign the argument to self, under the SAME name. Moreover, the class should define a argument `msgtype` for the metaclass, that gives the name of the message when parsed Example: class SendNumberToContainer(metaclass=MessageMeta, msgtype="send_nbr_container"): def __init__(self, container_id: str, a_number: int): self.container_id = container_id self.a_number = a_number """ _registered_messages = {} DEBUG = True def __new__(cls, name, bases, namespace, **kargs): # pylint: disable=unused-argument return super().__new__(cls, name, bases, namespace)
[docs] @classmethod def load(cls, bmessage): """ From a bytestring given by a (distant) call to Message.dump(), retrieve the original message :param bmessage: bytestring given by a .dump() call on a message :return: the original message """ message_dict = msgpack.loads(bmessage, encoding="utf8", use_list=False) try: obj = MessageMeta._registered_messages[message_dict["type"]].__new__(MessageMeta._registered_messages[message_dict["type"]]) object.__setattr__(obj, "__dict__", message_dict) except: raise TypeError("Unknown message type") from None if not obj._verify(): # pylint: disable=protected-access raise TypeError("Invalid message content") return obj
def __init__(cls, name, bases, attrs, msgtype): """ Ensure that the new class - Provides immutable objects - Respects the contract over __init__ - Verifies types - Has a .dump() function """ old_init = cls.__init__ old_setattr = cls.__setattr__ old_delattr = cls.__delattr__ parameters = inspect.signature(old_init).parameters.copy() del parameters["self"] # self is not a real parameter # type a reserved field if "type" in parameters: raise TypeError("'type' is reserved in messages, use another key") # check that all types have annotations for field in parameters: if parameters[field].annotation == inspect._empty: # pylint: disable=protected-access raise TypeError("All types should be annotated") MessageMeta._registered_messages[msgtype] = cls def new_init(self, *args, **kwargs): object.__setattr__(self, "__currently_mutable", True) # Get the message content message_content = {x[0]: y for (x, y) in zip(parameters.items(), args)} # Ask the init function to fill himself __dict__ old_init(self, *args, **kwargs) object.__delattr__(self, "__currently_mutable") # Verify that dict has been filled correctly if self.__dict__ != message_content: raise TypeError("__init__ does not fullfill the contract of messages. All fields must be init in the object and have the same value " "and name than in the parameters") # Do not forget to add message name now object.__setattr__(self, "type", msgtype) def new_delattr(self, name): if "__currently_mutable" in self.__dict__: old_delattr(self, name) else: raise TypeError("Immutable object") def new_setattr(self, name, value): if "__currently_mutable" in self.__dict__: old_setattr(self, name, value) else: raise TypeError("Immutable object") needed_keys = set(parameters.keys()) | {"type"} def _verify(self, force=False): """ Ensure this message is consistent with its definition. Verifies only if force or MessageMeta.DEBUG is True :param force: :return: True if correct, False else """ if force or MessageMeta.DEBUG: content_present = set(self.__dict__.keys()) == needed_keys type_ok = self.type == msgtype return content_present and type_ok return True def dump(self): """ :return: a bytestring containing a black-box representation of the message, that can be loaded using MessageMeta.load. """ return msgpack.dumps(self.__dict__, encoding="utf8", use_bin_type=True) super().__init__(name, bases, attrs) cls.__init__ = new_init cls.__delattr__ = new_delattr cls.__setattr__ = new_setattr cls._verify = _verify cls.dump = dump cls.__msgtype__ = msgtype
[docs]def run_tests(): class StartContainer(metaclass=MessageMeta, msgtype="start_container"): def __init__(self, job_id: str, container_name: str): self.job_id = job_id self.container_name = container_name class KillContainer(metaclass=MessageMeta, msgtype="kill_container"): def __init__(self, container_id: str): self.container_id = container_id print("----------------- Verify basic instantiation") obj = StartContainer("test", "test2") print(obj.job_id) assert obj.job_id == "test" print(obj.container_name) assert obj.container_name == "test2" print() print("----------------- Verify basic instantiation(2)") obj2 = KillContainer("test3") print(type(obj2)) assert type(obj2) == KillContainer print(obj2.container_id) assert obj2.container_id == "test3" print() print("----------------- Dump test") obj2_dump = obj2.dump() # pylint: disable=no-member print(obj2_dump) print() print("----------------- Load test") obj3 = MessageMeta.load(obj2_dump) print(type(obj3)) assert type(obj3) == KillContainer print(obj3.container_id) assert obj3.container_id == "test3" print() print("----------------- Assignation test") try: obj3.x = "a" print("does not work") except Exception as e: print(e) print("(works)") print() print("----------------- Invalid dump 1 (invalid type)") try: invalid_dump1 = b'\x82\xaccontainer_id\x01\xa4type\xaekill_containeI' obj4 = MessageMeta.load(invalid_dump1) print(type(obj4)) print(obj4.container_id) print("does not work") except TypeError as e: print(e) print("(works)") print() print("----------------- Invalid dump 2 (invalid fields)") try: invalid_dump2 = b'\x82\xaccontainer_iI\x01\xa4type\xaekill_container' obj5 = MessageMeta.load(invalid_dump2) print(type(obj5)) print(obj5.container_id) print("does not work") except KeyError as e: print(e) print("(works)") print() print("----------------- Invalid dump 3 (invalid content)") try: invalid_dump3 = msgpack.dumps({"type": "kill_container", "container_id": 2}, encoding="utf8") obj6 = MessageMeta.load(invalid_dump3) print(type(obj6)) print(obj6.container_id) print("does not work") except TypeError as e: print(e) print("(works)") print()
[docs]class ZMQUtils(object): """ Utilities that do serializing/unserializing of messages (whose metaclass is MessageMeta) """
[docs] @classmethod async def recv_with_addr(cls, socket): message = await socket.recv_multipart() addr = message[0] obj = MessageMeta.load(message[1]) return addr, obj
[docs] @classmethod async def send_with_addr(cls, socket, addr: bytes, obj): message = [addr, obj.dump()] await socket.send_multipart(message)
[docs] @classmethod async def recv(cls, socket, skip_first=False): message = await socket.recv_multipart() return MessageMeta.load(message[0] if not skip_first else message[1])
[docs] @classmethod async def send(cls, socket, obj, send_white=False): message_obj = obj.dump() await socket.send_multipart([message_obj] if not send_white else ["", message_obj])