Autour du codeDevelopper toujours mieux
Posté le

Le patron de conception Reactor

Le patron de conception Reactor est la base des boucle d'évènement utilisé dans les frameworks asynchrones. Il permet de notifier des actions lorsqu'une ressource est prête à être utilisée. Une ressource peut par exemple être un fichier et une action l'affichage du contenu du fichier lorsqu'il sera prêt à être lu.

Structure du design pattern reactor

Ce patron de conception fait collaborer les quatre classes suivantes:

Handle

Cette classe est un adaptateur pour les ressources du système d'exploitation tel que les fichiers ou les sockets. Elle permet d'avoir une interface commune a toutes les ressources.

Synchronous Event Demultiplexer

Cette classe est un adaptateur à la librairie de démultiplexage d'entrée sortie du système d'exploitation. Le rôle de cette classe est d'offrir une interface pour surveiller un ensemble de Handle et d'indiquer quel Handles est près à subir immédiatement des opérations sans que cela ne bloque le système. Lorsqu'un handle est près, cette classe en informe l'Initiation Dispatcher

exemple: Si un mail est un Handle, Synchronous Event Demultiplexer pourra surveiller ce mail et me dire lorsqu'il arrive. Je pourrai alors lire le mail immédiatement sans bloquer le système.

Initiation Dispatcher

Cette classe enregistre les Event Handler via sa méthode register_handler. Elle doit pouvoir associer chaque Event Handler à l'Handle qu'il contient. Cette classe possède une méthode handle_events qui utilise le Synchronous Event Demultiplexer pour ce mettre en pause jusqu'à ce qu'un Handle soit prêt. Lorsqu'un Handle est prêt, elle déclenche l'Event Handler correspondant.

Event Handler

Cette classe servira de classe de base à l'utilisateur pour définir des actions lorsqu'un Handle est prêt. Cela se fait en redéfinissant la méthode handle_event qui est appelée par l'Initiation Dispatcher lorsque le Handle est prêt.

Implementation naïve

Pour le SynchronousEventDemultiplexer, il est déjà implémenté sous python. Le module porte le nom de selectors et fait déjà office d'adaptateur, nous n'avons pas à nous soucier de savoir quelle librairie de multiplexage d'entrée-sortie et disponible sur l'OS selectors nous fournie une interface et se débrouille avec ce qu'il trouve.

La classe Selector enregistre des objets semblables à des fichiers (file-like) notre classe Handler aura donc une méthode fileno pour être compatible avec Selector. La classe InitiationDispatcher stocke les EventHandler dans un dictionnaire.

Voici une première implémentation naïve

from selectors import DefaultSelector as SynchronousEventDemultiplexer
from abc import abstractmethod, ABCMeta


class Handle(metaclass=ABCMeta):

    @abstractmethod
    def fileno():
        """
        return file descriptor
        """


class EventHandler(metaclass=ABCMeta):

    def __init__(self, handle):
        self.handle = handle

    def get_handle(self):
        return self.handle

    @abstractmethod
    def handle_event(self, type_event=None):
        raise NotImplementedError()


class InitiationDispatcher:
    def __init__(self, synchronous_event_demultiplexer):
        self.synchronous_event_demultiplexer = synchronous_event_demultiplexer
        self.handlers = {}  # {Handle: EventHandler, ...}

    def register_handler(self, event_handler, event_type):
        self.synchronous_event_demultiplexer.register(event_handler.get_handle(),
                                                      event_type)

        self.handlers[event_handler.get_handle()] = event_handler

    def handle_events(self):
        handles = self.synchronous_event_demultiplexer.select()
        for handle, event in handles:
            event_handler = self.handlers[handle.fileobj]
            event_handler.handle_event()

from reactor1 import Handle
from pathlib import Path
import os


class FifoHandle(Handle):
    def __init__(self, path_to_fifo):
        path = Path(path_to_fifo)
        if not path.is_fifo():
            raise ValueError('{} is not FIFO'.format(path_to_fifo))

        self.fifo = os.open(path_to_fifo, os.O_RDWR)

    def fileno(self):
        return self.fifo

    def read(self):
        reads = os.read(self.fifo, 2048)
        return reads.decode()

from reactor1 import EventHandler, SynchronousEventDemultiplexer, InitiationDispatcher
from selectors import EVENT_READ
from handles import FifoHandle


class PrintFifoEventHandler(EventHandler):

    def __init__(self, path_to_fifo):
        super().__init__(FifoHandle(path_to_fifo))

    def handle_event(self, type_event=None):
        print(self.handle.read())


if __name__ == '__main__':
    synchronous_event_demultiplexer = SynchronousEventDemultiplexer()
    initiation_dispatcher = InitiationDispatcher(synchronous_event_demultiplexer)
    fifo_event = PrintFifoEventHandler('/tmp/my_fifo')
    initiation_dispatcher.register_handler(fifo_event, EVENT_READ)

    while True:
        initiation_dispatcher.handle_events()

On a créé dans le fichier handles.py un Handle appelé FifoHandle pour manipuler les FIFO. Un FIFO est un fichier spécial qui permet à deux processus de dialoguer, le FIFO est visible comme un fichier, mais l'information n'est jamais stockée sur le disque et transite en mémoire. Pour créer un FIFO, on peut utiliser la commande mkfifo.

Puis on instancie un SynchronousEventDemultiplexer, un InitiationDispatcher et un PrintFifoEventHandler que l'on enregistre via la méthode register_handler.

Avant de lancer le programme, il faudra créer le FIFO avec la commande

mkfifo /tmp/my_fifo

On pourra une fois le programme lancé écrire dans le FIFO:

echo Toto > /tmp/my_fifo

Implementation un peu plus pythonique

La méthode Selector.select retourne une liste d'objets de type SelectorKey qui possèdent les attributs suivant :

  • fileobj: Contient notre file-like objet.
  • fd: Contiens le descripteur de fichier.
  • events: L'évènement qui s'est produit (lecture / écriture)
  • data: Un objet que l'utilisateur a fourni en troisième paramètre lors de l'appel à la méthode Selector.register

Une des simplifications possibles est donc d'utiliser l'attribut data pour stocker l'EventHandler, nous n'avons donc plus à maintenir le dictionnaire {Handle: EventHandler}

Gestion des coroutines

Une coroutine est une fonction dont l'exécution peut être suspendue est repris. En python les coroutines sont des objets qui possède une méthode send(). On doit appeler cette méthode en lui passant None. Cela est dû au fait que historiquement, l'implémentation des coroutines est basée sur celle des generators. La méthode send retourne None ou lance une exception de type StopIteration contenant le résultat. En pyton3.5, on peut chaîner les coroutine en utilisant le mot-clé await comme ceci:

async def coroutine_func_1():
    return 1

async def coroutine_func_2():
    a = await coroutine_func_1()
    b = await coroutine_func_1()
    return a + b

coroutine = coroutine_func_2()
coroutine.send(None)  #  StopIteration: 2

Le mot clè await peut également être suivi d'un object awaitable, qui est un objet qui doit définir la méthode __await__. Cette méthode doit retourner un iterator dont la méthode __next__ revoie None ou le résultat dans uneStopIteration. On va donc créer un awaitable qui retournera un iterator qui sera un EventHandler. lorsque la méthode handle_event sera appelée, elle indiquera à l'objet qu'au prochain appele de __next__, il peut lancer un StopIteration pour donner le résultat et de désinscrire le handle de l'initiation_dispatcher

class FifoReader:

    class FifoReaderIterator(EventHandler):
        def __init__(self, path_to_fifo, initiation_dispatcher):
            self.handle = FifoHandle(path_to_fifo)
            self._initiation_dispatcher = initiation_dispatcher
            self._ready = False
            self._data = None
            self._initiation_dispatcher.register_handler(self, EVENT_READ)

        def __next__(self):
            if self._ready:
                raise StopIteration(self._data)
            return None

        def handle_event(self):
            self._ready = True
            self._data = self.handle.read()
            self._initiation_dispatcher.remove_handler(self)

    def __init__(self, path_to_fifo, initiation_dispatcher):
        self._iterator = self.FifoReaderIterator(path_to_fifo,
                                                 initiation_dispatcher)

    def __await__(self):
        return self._iterator

On poura donc utiliser la classe FifoReader dans une coroutine comme ceci pour concaténer la sortie de deux FIFO:

async def concat():
    a = await FifoReader('/tmp/my_fifo_1',
                         initiation_dispatcher)
    b = await FifoReader('/tmp/my_fifo_2',
                         initiation_dispatcher)
    print(a.strip() + b.strip())

On va ensuite modifier l'InitiationDispatcher. Afin qu'il puisse gérer les coroutines il va posséder une méthode add_coroutine qui tentera d'executer la coroutine et la mettra dans une liste si elle retourneNone. La méthode handle_events après avoir géré les handles tentera d'exécuter les coroutines et ne conservera que celle qui n'ont pas lancé de StopIteration.

from selectors import DefaultSelector as SynchronousEventDemultiplexer
from selectors import EVENT_READ
from abc import abstractmethod, ABCMeta


class Handle(metaclass=ABCMeta):

    @abstractmethod
    def fileno():
        """
        return file descriptor
        """


class EventHandler:

    def __init__(self, handle):
        self.handle = handle

    def get_handle(self):
        return self.handle

    def handle_event(self, type_event=None):
        raise NotImplementedError()


class InitiationDispatcher:
    def __init__(self, synchronous_event_demultiplexer):
        self.synchronous_event_demultiplexer = synchronous_event_demultiplexer
        self._coroutines = []

    def register_handler(self, event_handler, event_type):
        self.synchronous_event_demultiplexer.register(event_handler.get_handle(),
                                                      event_type,
                                                      event_handler)

    def remove_handler(self, event_handler):
        self.synchronous_event_demultiplexer.unregister(event_handler.get_handle())


    def add_coroutine(self, coroutine):
        try:
            coroutine.send(None)
        except StopIteration:
            pass
        else:
            self._coroutines.append(coroutine)

    def handle_events(self):
        handles = self.synchronous_event_demultiplexer.select()
        for handle, event in handles:
            handle.data.handle_event()

        not_executed = []
        for coroutine in self._coroutines:
            try:
                coroutine.send(None)
            except StopIteration:
                pass
            else:
                not_executed.append(coroutine)

        self._coroutines = not_executed

from reactor import EventHandler, SynchronousEventDemultiplexer, InitiationDispatcher
from selectors import EVENT_READ
from handles import FifoHandle


class PrintFifoEventHandler(EventHandler):

    def __init__(self, path_to_fifo):
        super().__init__(FifoHandle(path_to_fifo))

    def handle_event(self, type_event=None):
        print(self.handle.read())


class FifoReader:

    class FifoReaderIterator(EventHandler):
        def __init__(self, path_to_fifo, initiation_dispatcher):
            self.handle = FifoHandle(path_to_fifo)
            self._initiation_dispatcher = initiation_dispatcher
            self._ready = False
            self._data = None
            self._initiation_dispatcher.register_handler(self, EVENT_READ)

        def __next__(self):
            if self._ready:
                raise StopIteration(self._data)
            return None

        def handle_event(self):
            self._ready = True
            self._data = self.handle.read()
            self._initiation_dispatcher.remove_handler(self)

    def __init__(self, path_to_fifo, initiation_dispatcher):
        self._iterator = self.FifoReaderIterator(path_to_fifo,
                                                 initiation_dispatcher)

    def __await__(self):
        return self._iterator


async def concat():
    a = await FifoReader('/tmp/my_fifo_1',
                         initiation_dispatcher)
    b = await FifoReader('/tmp/my_fifo_2',
                         initiation_dispatcher)
    print(a.strip() + b.strip())



if __name__ == '__main__':
    synchronous_event_demultiplexer = SynchronousEventDemultiplexer()
    initiation_dispatcher = InitiationDispatcher(synchronous_event_demultiplexer)
    initiation_dispatcher.add_coroutine(concat())

    fifo_event = PrintFifoEventHandler('/tmp/my_fifo')
    initiation_dispatcher.register_handler(fifo_event, EVENT_READ)


    while True:
        initiation_dispatcher.handle_events()

Créez deux FIFO via la commande mkfifo:

mkfifo /tmp/my_fifo_1 /tmp/my_fifo_2

Après avoir lancé le main.py

python3.5 main.py

On peut utiliser un autre terminal pour écrire dans les FIFO.

echo 12 > /tmp/my_fifo_1

echo toto > /tmp/my_fifo

echo 34 > /tmp/my_fifo_2

Frôllons une boucle d'événements

Ce que l'on vient de développer est très proche d'une boucle d'évènement telle que dans le framework AsyncIO ou Tornado Il faudrait que l'InitiationDispatcher soit encapsulé dans un singleton, pour qu'il puisse être appelé facilement dans plusieurs modules et également que l'on puisse gérer l'exécution de fonctions à un moment donnés. Pour implémenter cette fonctionnalité, on va utiliser le paramètre, optionnel, timeout de select qui permet de dire à select, bloque tant qu'un fichier n'est pas près à être lu ou que le temps donné par timeout est écoulé.

On va ajouter une méthode call_later à notre InitiationDispatcher pour stoker les fonctions à exécuter ainsi que le moment auquel il doit les exécuter dans un tas. Les fonctions qui doivent être exécutée en première seront au sommet du tas. La méthode handle_events va regarder s'il y a une fonction en attente au sommet du tas et récupérer le moment auquel elle doit l'exécute. Elle va utiliser ce moment pour initialiser le paramètre timeout de select. Lorsque select rend la main, on ne sais pas si c'est à cause d'un timeout, d'un handler qui est près ou les deux. Il faut vérifier s'il est temps d'exécuter la fonction au sommet du tas puis la retirer de ce tas si c'est le cas.

class InitiationDispatcher:
    def __init__(self, synchronous_event_demultiplexer):
        self.synchronous_event_demultiplexer = synchronous_event_demultiplexer
        self._coroutines = []
        self._heap = []

    def register_handler(self, event_handler, event_type):
        self.synchronous_event_demultiplexer.register(event_handler.get_handle(),
                                                      event_type,
                                                      event_handler)

    def remove_handler(self, event_handler):
        self.synchronous_event_demultiplexer.unregister(event_handler.get_handle())


    def add_coroutine(self, coroutine):
        try:
            coroutine.send(None)
        except StopIteration:
            pass
        else:
            self._coroutines.append(coroutine)

    def call_later(self, delay, func):
        heappush(self._heap, [time() + delay, func])

    def handle_events(self):

        if self._heap:
            moment = self._heap[0][0]
            timeout = moment - time()
        else:
            moment = None
            timeout = None

        handles = self.synchronous_event_demultiplexer.select(timeout)
        for handle, event in handles:
            handle.data.handle_event()

        if moment is not None and moment <= time():
            heappop(self._heap)[1]()

        not_executed = []
        for coroutine in self._coroutines:
            try:
                coroutine.send(None)
            except StopIteration:
                pass
            else:
                not_executed.append(coroutine)

        self._coroutines = not_executed

Voilà, j'espère que ça vous a plu, et que les boucles d'évènement vous paraîtront un peu moins magiques. laughing

A vos IDE et bon code.