PythonにおけるPub/Subパターンの実装
目次
Pub/Subパターンとは
Pub/Subパターンとは、イベント駆動型プログラミングのデザインパターンです。Publisher(発行者)が発行したイベントをBroker(仲介者)が取りまとめ、Subscriber(購読者)に伝達します。イベントはBrokerが管理しているため、PublisherとSubscriber同士は疎結合となります。
Observerパターンとの違い
同じイベント駆動のデザインパターンにObseverパターンがあります。
ObseverパターンとPub/Subパターンの大きな違いは、イベントをどこで管理するのかにあります。Obseverパターンは、観測者と観測対象が直接イベントのやり取りを行いますが、Pub/Subパターンでは、Event Chanelがイベントの管理を行い、イベントの発信者と購読者同士が直接やり取りを行うことはありません。
実装例
Pub/Subパターンの実装例
class EventChannel(object):
def __init__(self):
self.subscribers = {}
def unsubscribe(self, event, callback):
if event is not None or event != ""\
and event in self.subscribers.keys():
self.subscribers[event] = list(
filter(
lambda x: x is not callback,
self.subscribers[event]
)
)
def subscribe(self, event, callback):
if not callable(callback):
raise ValueError("callback must be callable")
if event is None or event == "":
raise ValueError("Event cant be empty")
if event not in self.subscribers.keys():
self.subscribers[event] = [callback]
else:
self.subscribers[event].append(callback)
def publish(self, event, args):
if event in self.subscribers.keys():
for callback in self.subscribers[event]:
callback(args)
if __name__ == "__main__":
event_channel = EventChannel()
callback = lambda x: print(x)
event_channel.subscribe("myevent", callback)
event_channel.publish("myevent", "Hello, world!")
# out: "Hello, world!"
event_channel.unsubscribe("myevent", callback)
event_channel.publish("myevent", "Hello, world!")
# No output
Fast Pub-Sub python implementation: starting (I) – DEV一部改変
おすすめパッケージ
Pub/Subパターンを使用するなら、event-channelを利用するのがおすすめです。
event-channelには、スレッドなし、同期マルチスレッド、非同期マルチスレッド用のクラスが用意されています。
インストール
$ pip install event-channel
スレッドなし
スレッドなしでのイベント実行例
import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel
non_thread = EventChannel()
non_thread.subscribe("myevent", time.sleep)
non_thread.subscribe("myevent", time.sleep)
start = time.time()
non_thread.publish("myevent", 3)
end = time.time()
print("non threaded function elapsed time {0}".format(end - start))
#non threaded function elapsed time 6.0080871582
同期マルチスレッド
同期マルチスレッドでのイベント実行例
import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel
threaded = ThreadedEventChannel()
threaded.subscribe("myevent", time.sleep)
threaded.subscribe("myevent", time.sleep)
start = time.time()
threaded.publish("myevent", 3)
end = time.time()
print("threaded function elapsed time {0}".format(end - start))
# threaded function elapsed time 3.00581121445
非同期マルチスレッド
非同期マルチスレッドでのイベント実行例
import time
from event_channel.event_channel import EventChannel
from event_channel.threaded_event_channel import ThreadedEventChannel
non_blocking_threaded = ThreadedEventChannel(blocking=False)
non_blocking_threaded.subscribe("myevent", time.sleep)
non_blocking_threaded.subscribe("myevent", time.sleep)
start = time.time()
non_blocking_threaded.publish("myevent", 3)
end = time.time()
print("threaded function non blocking elapsed time {0}".format(end - start))
# threaded function non blocking elapsed time 0.00333380699158