2017-09-17 14:29:23 -07:00
|
|
|
import zmq
|
|
|
|
from threading import Semaphore
|
|
|
|
from time import sleep, time
|
|
|
|
|
|
|
|
|
|
|
|
class PublishSetupException(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class MsgbusSubClient(object):
|
2017-09-17 14:33:29 -07:00
|
|
|
def __init__(self, host, port=None, pubport=None):
|
2017-09-17 14:29:23 -07:00
|
|
|
self.host = host
|
|
|
|
self.port = port
|
2017-09-17 14:33:29 -07:00
|
|
|
self.pubport = pubport
|
|
|
|
self._ctx = None # ZMQ context
|
2017-09-17 14:29:23 -07:00
|
|
|
self.sub_socket = None # listener sockets
|
|
|
|
self.subscriptions = []
|
|
|
|
self.pub_socket = None # publisher socket
|
|
|
|
self.lock = Semaphore(1)
|
2017-09-17 14:33:29 -07:00
|
|
|
# self.connect()
|
2017-09-17 14:29:23 -07:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if self.sub_socket:
|
|
|
|
self.sub_socket.close()
|
|
|
|
if self.pub_socket:
|
|
|
|
self.pub_socket.close()
|
|
|
|
if self.ctx:
|
|
|
|
self.ctx.destroy()
|
|
|
|
|
2017-09-17 14:33:29 -07:00
|
|
|
@property
|
|
|
|
def ctx(self):
|
|
|
|
if not self._ctx:
|
|
|
|
self._ctx = zmq.Context()
|
|
|
|
return self._ctx
|
|
|
|
|
2017-09-17 14:29:23 -07:00
|
|
|
def connect(self):
|
2017-09-17 14:33:29 -07:00
|
|
|
if self.port and not self.sub_socket:
|
|
|
|
self.connect_sub(self.host, self.port)
|
|
|
|
if self.pubport and not self.pub_socket:
|
|
|
|
self.connect_pub(self.host, self.pubport)
|
|
|
|
|
|
|
|
def connect_sub(self, host, port):
|
2017-09-17 14:29:23 -07:00
|
|
|
self.sub_socket = self.ctx.socket(zmq.SUB)
|
2017-09-17 14:33:29 -07:00
|
|
|
self.sub_socket.connect("tcp://{}:{}".format(host, port))
|
|
|
|
|
|
|
|
def connect_pub(self, host, port):
|
|
|
|
pub_socket = self.ctx.socket(zmq.PUB)
|
|
|
|
pub_socket.connect("tcp://{}:{}".format(host, port))
|
|
|
|
self.pub_socket = pub_socket
|
2017-09-17 14:29:23 -07:00
|
|
|
|
|
|
|
def sub(self, channel=None):
|
|
|
|
if channel is None:
|
|
|
|
channel = ''
|
|
|
|
assert type(channel) is str
|
2017-09-17 14:33:29 -07:00
|
|
|
if not self.sub_socket:
|
|
|
|
self.connect_sub(self.host, self.port)
|
2017-09-17 14:29:23 -07:00
|
|
|
self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8"))
|
|
|
|
self.subscriptions.append(channel)
|
|
|
|
|
|
|
|
def unsub(self, channel):
|
|
|
|
assert type(channel) is str
|
|
|
|
if channel in self.subscriptions:
|
|
|
|
self.subscriptions.remove(channel)
|
|
|
|
self.sub_socket.setsockopt(zmq.UNSUBSCRIBE, channel.encode("utf-8"))
|
|
|
|
|
|
|
|
def recv(self, decode=True, block=True):
|
|
|
|
recv_args = (zmq.NOBLOCK, ) if not block else ()
|
|
|
|
message = self.sub_socket.recv(*recv_args)
|
|
|
|
channel, body = message.split(b' ', 1)
|
|
|
|
return channel.decode("utf-8"), (body.decode('utf-8') if decode else body)
|
|
|
|
|
|
|
|
def _setup_publish_socket(self, timeout=5):
|
|
|
|
start = time()
|
|
|
|
try:
|
|
|
|
self.sub("__msgbus_meta")
|
|
|
|
while not timeout or time() < start + timeout:
|
|
|
|
try:
|
|
|
|
channel, message = self.recv(block=False)
|
|
|
|
except zmq.error.Again:
|
|
|
|
sleep(0.01)
|
|
|
|
continue
|
|
|
|
if channel != "__msgbus_meta":
|
|
|
|
continue
|
|
|
|
meta, args = message.split(" ", 1)
|
|
|
|
if meta != "__my_info":
|
|
|
|
continue
|
2017-09-17 14:33:29 -07:00
|
|
|
server_name, subport, subproto, pubport, pubproto = args.split(" ")
|
|
|
|
self.pubport = subport
|
|
|
|
self.connect_pub(self.host, self.pubport)
|
|
|
|
return
|
2017-09-17 14:29:23 -07:00
|
|
|
raise PublishSetupException("Could not establish publisher socket")
|
|
|
|
finally:
|
|
|
|
self.unsub("__msgbus_meta")
|
|
|
|
|
|
|
|
def pub(self, channel, message, encode_msg=True, settle=True, timeout=5):
|
|
|
|
if encode_msg:
|
|
|
|
message = message.encode("utf-8")
|
|
|
|
if not self.pub_socket:
|
|
|
|
with self.lock:
|
2017-09-17 20:28:03 -07:00
|
|
|
self.prepare_pub(timeout=timeout, settle=settle)
|
2017-09-17 14:29:23 -07:00
|
|
|
self.pub_socket.send(channel.encode("utf-8") + b' ' + message)
|
2017-09-17 20:28:03 -07:00
|
|
|
|
|
|
|
def prepare_pub(self, timeout=5, settle=True):
|
|
|
|
self._setup_publish_socket(timeout)
|
|
|
|
if settle:
|
|
|
|
sleep(1)
|