ZeroMQ-based python message broker server/client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

106 lines
3.6 KiB

import zmq
from threading import Semaphore
from time import sleep, time
class PublishSetupException(Exception):
pass
class MsgbusSubClient(object):
def __init__(self, host, port=None, pubport=None):
self.host = host
self.port = port
self.pubport = pubport
self._ctx = None # ZMQ context
self.sub_socket = None # listener sockets
self.subscriptions = []
self.pub_socket = None # publisher socket
self.lock = Semaphore(1)
# self.connect()
def close(self):
if self.sub_socket:
self.sub_socket.close()
if self.pub_socket:
self.pub_socket.close()
if self.ctx:
self.ctx.destroy()
@property
def ctx(self):
if not self._ctx:
self._ctx = zmq.Context()
return self._ctx
def connect(self):
if self.port and not self.sub_socket:
self.connect_sub(self.host, self.port)
if self.pubport and not self.pub_socket:
self.connect_pub(self.host, self.pubport)
def connect_sub(self, host, port):
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.connect("tcp://{}:{}".format(host, port))
def connect_pub(self, host, port):
pub_socket = self.ctx.socket(zmq.PUB)
pub_socket.connect("tcp://{}:{}".format(host, port))
self.pub_socket = pub_socket
def sub(self, channel=None):
if channel is None:
channel = ''
assert type(channel) is str
if not self.sub_socket:
self.connect_sub(self.host, self.port)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8"))
self.subscriptions.append(channel)
def unsub(self, channel):
assert type(channel) is str
if channel in self.subscriptions:
self.subscriptions.remove(channel)
self.sub_socket.setsockopt(zmq.UNSUBSCRIBE, channel.encode("utf-8"))
def recv(self, decode=True, block=True):
recv_args = (zmq.NOBLOCK, ) if not block else ()
message = self.sub_socket.recv(*recv_args)
channel, body = message.split(b' ', 1)
return channel.decode("utf-8"), (body.decode('utf-8') if decode else body)
def _setup_publish_socket(self, timeout=5):
start = time()
try:
self.sub("__msgbus_meta")
while not timeout or time() < start + timeout:
try:
channel, message = self.recv(block=False)
except zmq.error.Again:
sleep(0.01)
continue
if channel != "__msgbus_meta":
continue
meta, args = message.split(" ", 1)
if meta != "__my_info":
continue
server_name, subport, subproto, pubport, pubproto = args.split(" ")
self.pubport = subport
self.connect_pub(self.host, self.pubport)
return
raise PublishSetupException("Could not establish publisher socket")
finally:
self.unsub("__msgbus_meta")
def pub(self, channel, message, encode_msg=True, settle=True, timeout=5):
if encode_msg:
message = message.encode("utf-8")
if not self.pub_socket:
with self.lock:
self.prepare_pub(timeout=timeout, settle=settle)
self.pub_socket.send(channel.encode("utf-8") + b' ' + message)
def prepare_pub(self, timeout=5, settle=True):
self._setup_publish_socket(timeout)
if settle:
sleep(1)