Autour du codeDevelopper toujours mieux
Posté le

Le design pattern observer et ses variantes

L'observer est un design pattern dans lequel un objet l'observer va s'abonner à un autre objet, le sujet, et attendre que celui-ci le notifie d'un événement. Rien à voir avec le voyeurisme.

Un événement est généralement un changement d'états de l'objet observé. L'objet observé est appelé le sujet ou l'observable. Un observable peut rattacher plusieurs observer à lui-même.

Voici la classe observable, elle possède une méthode add_observer pour lier un observer et une méthode notify_observer qui lorsqu'elle est appelée exécute la méthode notify de tout les observers liés.

class Observable:
    def __init__(self):
        self.observers = []

    def notify_observers(self):
        for obs in self.observers:
            obs.notify(self)

    def add_observer(self, obs):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers.append(obs)

class Observable(object):
    def __init__(self):
        self.observers = []

    def notify_observers(self):
        for obs in self.observers:
            obs.notify(self)

    def add_observer(self, obs):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers.append(obs)

Pour vous montrer un exemple d'utilisation, on va créer deux observers, Speed qui affiche la vitesse d'une voiture et Rpm qui affiche les rotations par minute du moteur.

class Speed:
    def notify(self, subject):
        print("speed", subject.speed)

class Rpm:
    def notify(self, subject):
        print("rpm", subject.rpm)

class Speed(object):
    def notify(self, subject):
        print "speed", subject.speed

class Rpm(object):
    def notify(self, subject):
        print "rpm", subject.rpm

La classe voiture va hériter de Observable et va appeler la méthode notify_observers à chaque changement de rpm via la méthode accelerate ou de vitesse via les méthode downshift et upshift.

class Car(Observable):
    def __init__(self):
        Observable.__init__(self)
        self.speed = 0
        self.rpm = 0
        self.gearbox = [.016, 0.038, .038, .042]
        self.gearbox_speed  = 0

    def accelerate(self):
        self.rpm += 1000
        self.speed = self.rpm * self.gearbox[self.gearbox_speed]
        self.notify_observers()

    def downshift(self):
        if self.gearbox_speed > 0:
            self.gearbox_speed -= 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers()

    def upshift(self):
        if self.gearbox_speed < 3:
            self.gearbox_speed += 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers()

class Car(Observable):
    def __init__(self):
        Observable.__init__(self)
        self.speed = 0
        self.rpm = 0
        self.gearbox = [.016, 0.038, .038, .042]
        self.gearbox_speed  = 0

    def accelerate(self):
        self.rpm += 1000
        self.speed = self.rpm * self.gearbox[self.gearbox_speed]
        self.notify_observers()

    def downshift(self):
        if self.gearbox_speed > 0:
            self.gearbox_speed -= 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers()

    def upshift(self):
        if self.gearbox_speed < 3:
            self.gearbox_speed += 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers()

Voici comment ça s'utilise dans le code.

car = Car()
car.add_observer(Speed())
car.add_observer(Rpm())

car.accelerate()
car.upshift()
car.upshift()

car = Car()
car.add_observer(Speed())
car.add_observer(Rpm())

car.accelerate()
car.upshift()
car.upshift()

On peut faire plusieurs remarques.Les observeurs prennent en paramètre l'objet Car et vont chercher se dont il on besoin.On appelle ceci le mode pull qui est à l'opposer du modèle push où le sujet envoie des informations détaillées sur son état généralement dans un objet appelé Event.Lorsque l'on passe un rapport avec la méthode upshift seule la vitesse (speed) change; pourtant, tout les observer sont notifiés. Une solution et de rendre Observable plus complexe en lui permettant d'enregistrer un observer sur certain évènement comme un changement de rpm ou de vitesse.

Voici les modifications de la classe Observable pour que les notifications soie plus sélective.

class Observable:
    def __init__(self):
        self.observers = defaultdict(list) # Pour un évènement en clef,
                                           #  j'ai une liste d'observers.
    def notify_observers(self, event):
        for obs in self.observers[event]:
            obs.notify(self)

    def add_observer(self, obs, event):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers[event].append(obs)

class Car(Observable):
    def __init__(self):
        Observable.__init__(self)
        self.speed = 0
        self.rpm = 0
        self.gearbox = [.016, 0.038, .038, .042]
        self.gearbox_speed  = 0

    def accelerate(self):
        self.rpm += 1000
        self.speed = self.rpm * self.gearbox[self.gearbox_speed]
        self.notify_observers('rpm')
        self.notify_observers('speed')

    def downshift(self):
        if self.gearbox_speed > 0:
            self.gearbox_speed -= 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers('speed') # Seul les observer de la vitesse
                                           # sont notifier.
    def upshift(self):
        if self.gearbox_speed < 3:
            self.gearbox_speed += 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers('speed')


car = Car()
car.add_observer(Speed(), 'speed')
car.add_observer(Rpm(),'rpm')

from collections import defaultdict

class Observable(object):
    def __init__(self):
        self.observers = defaultdict(list) # Pour un évènement en clef,
                                           #  j'ai une liste d'observers.
    def notify_observers(self, event):
        for obs in self.observers[event]:
            obs.notify(self)

    def add_observer(self, obs, event):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers[event].append(obs)

class Car(Observable):
    def __init__(self):
        Observable.__init__(self)
        self.speed = 0
        self.rpm = 0
        self.gearbox = [.016, 0.038, .038, .042]
        self.gearbox_speed  = 0

    def accelerate(self):
        self.rpm += 1000
        self.speed = self.rpm * self.gearbox[self.gearbox_speed]
        self.notify_observers('rpm')
        self.notify_observers('speed')

    def downshift(self):
        if self.gearbox_speed > 0:
            self.gearbox_speed -= 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers('speed') # Seul les observer de la vitesse
                                           # sont notifier.
    def upshift(self):
        if self.gearbox_speed < 3:
            self.gearbox_speed += 1
            self.speed = self.rpm * self.gearbox[self.gearbox_speed]
            self.notify_observers('speed')


car = Car()
car.add_observer(Speed(), 'speed')
car.add_observer(Rpm(),'rpm')

Les observers threadés

On va s'attaquer à un problème plus complexe. Lorsque l'état d'un objet est modifié, ses observer vont mettre à jour une BDD, envoyer un mail ou uploader un fichier. Le problème et que si l'un des observer met beaucoup de temps à faire son travail, il va bloquer touts les autres. Pour éviter ça, on va utiliser des threads.

from time import sleep

class SendMail:
    def notify(self, subject):
        print('start send mail')
        sleep(1) # Simule un long envoie de mail.
        print('end send mail')

class UploadFile:
    def notify(self, subject):
        print('start upload')
        sleep(3) #  Simule l'upload d'un gros fichier.
        print('end upload')

from threading import Thread

class Observable:
    def __init__(self):
        self.observers = []

    def notify_observers(self):
        for obs in self.observers: # La notification se fait dans
                                   # un thread séparé.
            thread = Thread(target=obs.notify, args=(self,))
            thread.start()

    def add_observer(self, obs):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers.append(obs)


subject = Observable()
subject.add_observer(SendMail())
subject.add_observer(UploadFile())
subject.notify_observers()
subject.notify_observers()

from time import sleep

class SendMail(object):
    def notify(self, subject):
        print 'start send mail'
        sleep(1) # Simule un long envoie de mail.
        print 'end send mail'

class UploadFile(object):
    def notify(self, subject):
        print 'start upload'
        sleep(3) #  Simule l'upload d'un gros fichier.
        print 'end upload'

from threading import Thread

class Observable(object):
    def __init__(self):
        self.observers = []

    def notify_observers(self):
        for obs in self.observers: # La notification se fait dans
                                   # un thread séparé.
            thread = Thread(target=obs.notify, args=(self,))
            thread.start()

    def add_observer(self, obs):
        if not hasattr(obs, 'notify'):
            raise ValueError("First argument must be object with notify method")
        self.observers.append(obs)


subject = Observable()
subject.add_observer(SendMail())
subject.add_observer(UploadFile())
subject.notify_observers()
subject.notify_observers()

Voilà j'espère que ça vous a plu n'hésitez pas si vous avez des remarques d'ici là a vos IDE et bon code ! smile