PythonにおけるPub/Subパターンの実装

PythonにおけるPub/Subパターンの実装

Pub/Subパターンとは

Pub/Subパターンとは、イベント駆動型プログラミングのデザインパターンです。Publisher(発行者)が発行したイベントをBroker(仲介者)が取りまとめ、Subscriber(購読者)に伝達します。イベントはBrokerが管理しているため、PublisherとSubscriber同士は疎結合となります。

What is Pub-Sub? Rails Publish-Subscribe Pattern Tutorial | Toptalより

Observerパターンとの違い

同じイベント駆動のデザインパターンにObseverパターンがあります。

ObseverパターンとPub/Subパターンの大きな違いは、イベントをどこで管理するのかにあります。Obseverパターンは、観測者と観測対象が直接イベントのやり取りを行いますが、Pub/Subパターンでは、Event Chanelがイベントの管理を行い、イベントの発信者と購読者同士が直接やり取りを行うことはありません。

Observer vs Pub-Sub pattern | Hacker Noonより

実装例

Pub/Subパターンの実装例

class EventChannel(object):
    def __init__(self):
        self.subscribers = {}

    def unsubscribe(self, event, callback):
        if event is not None or event != ""\
                and event in self.subscribers.keys():
            self.subscribers[event] = list(
                filter(
                    lambda x: x is not callback,
                    self.subscribers[event]
                )
            )

    def subscribe(self, event, callback):
        if not callable(callback):
            raise ValueError("callback must be callable")

        if event is None or event == "":
            raise ValueError("Event cant be empty")

        if event not in self.subscribers.keys():
            self.subscribers[event] = [callback]
        else:
            self.subscribers[event].append(callback)

    def publish(self, event, args):
        if event in self.subscribers.keys():
            for callback in self.subscribers[event]:
                callback(args)


if __name__ == "__main__":
    event_channel = EventChannel()

    callback = lambda x: print(x)

    event_channel.subscribe("myevent", callback)

    event_channel.publish("myevent", "Hello, world!")

    # out: "Hello, world!"

    event_channel.unsubscribe("myevent", callback)

    event_channel.publish("myevent", "Hello, world!")

    # No output

Fast Pub-Sub python implementation: starting (I) – DEV一部改変

おすすめパッケージ

Pub/Subパターンを使用するなら、event-channelを利用するのがおすすめです。

event-channelには、スレッドなし、同期マルチスレッド、非同期マルチスレッド用のクラスが用意されています。

インストール

$ pip install event-channel

スレッドなし

スレッドなしでのイベント実行例

import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel

non_thread = EventChannel()

non_thread.subscribe("myevent", time.sleep)
non_thread.subscribe("myevent", time.sleep)
start = time.time()
non_thread.publish("myevent", 3)
end = time.time()

print("non threaded function elapsed time {0}".format(end - start))
#non threaded function elapsed time 6.0080871582

同期マルチスレッド

同期マルチスレッドでのイベント実行例

import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel

threaded = ThreadedEventChannel()

threaded.subscribe("myevent", time.sleep)
threaded.subscribe("myevent", time.sleep)
start = time.time()
threaded.publish("myevent", 3)
end = time.time()

print("threaded function elapsed time {0}".format(end - start))
# threaded function elapsed time 3.00581121445

非同期マルチスレッド

非同期マルチスレッドでのイベント実行例

import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel

non_blocking_threaded = ThreadedEventChannel(blocking=False)

non_blocking_threaded.subscribe("myevent", time.sleep)
non_blocking_threaded.subscribe("myevent", time.sleep)
start = time.time()
non_blocking_threaded.publish("myevent", 3)
end = time.time()

print("threaded function non blocking elapsed time {0}".format(end - start))
# threaded function non blocking elapsed time 0.00333380699158

参考サイト