diff --git a/msgbus/client.py b/msgbus/client.py index f2f0f03..e1311f1 100644 --- a/msgbus/client.py +++ b/msgbus/client.py @@ -8,15 +8,16 @@ class PublishSetupException(Exception): class MsgbusSubClient(object): - def __init__(self, host, port): + def __init__(self, host, port=None, pubport=None): self.host = host self.port = port - self.ctx = None # ZMQ context + self.pubport = pubport + self._ctx = None # ZMQ context self.sub_socket = None # listener sockets self.subscriptions = [] self.pub_socket = None # publisher socket self.lock = Semaphore(1) - self.connect() + # self.connect() def close(self): if self.sub_socket: @@ -26,16 +27,34 @@ class MsgbusSubClient(object): if self.ctx: self.ctx.destroy() + @property + def ctx(self): + if not self._ctx: + self._ctx = zmq.Context() + return self._ctx + def connect(self): - if not self.ctx: - self.ctx = zmq.Context() + if self.port and not self.sub_socket: + self.connect_sub(self.host, self.port) + if self.pubport and not self.pub_socket: + self.connect_pub(self.host, self.pubport) + + def connect_sub(self, host, port): self.sub_socket = self.ctx.socket(zmq.SUB) - self.sub_socket.connect("tcp://{}:{}".format(self.host, self.port)) + self.sub_socket.connect("tcp://{}:{}".format(host, port)) + + def connect_pub(self, host, port): + pub_socket = self.ctx.socket(zmq.PUB) + pub_socket.connect("tcp://{}:{}".format(host, port)) + self.pub_socket = pub_socket def sub(self, channel=None): if channel is None: channel = '' assert type(channel) is str + if not self.sub_socket: + self.connect_sub(self.host, self.port) + print("subbin to", channel) self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8")) self.subscriptions.append(channel) @@ -66,11 +85,10 @@ class MsgbusSubClient(object): meta, args = message.split(" ", 1) if meta != "__my_info": continue - server_name, subport, subproto, pubbport, pubproto = args.split(" ") - remote = "tcp://{}:{}".format(self.host, subport) - pub_socket = self.ctx.socket(zmq.PUB) - pub_socket.connect(remote) - return pub_socket + server_name, subport, subproto, pubport, pubproto = args.split(" ") + self.pubport = subport + self.connect_pub(self.host, self.pubport) + return raise PublishSetupException("Could not establish publisher socket") finally: self.unsub("__msgbus_meta") @@ -80,7 +98,7 @@ class MsgbusSubClient(object): message = message.encode("utf-8") if not self.pub_socket: with self.lock: - self.pub_socket = self._setup_publish_socket(timeout) + self._setup_publish_socket(timeout) if settle: sleep(1) self.pub_socket.send(channel.encode("utf-8") + b' ' + message) diff --git a/msgbus/pub.py b/msgbus/pub.py index efd8b13..a2f8038 100644 --- a/msgbus/pub.py +++ b/msgbus/pub.py @@ -1,7 +1,9 @@ -def send_native(host, port, channel, messages): +def send_native(host, port, pubport, channel, messages): """ - Send some messages on a specified channel using the msgbus client + Send some messages on a specified channel using the msgbus client. Note that if we don't specify a non-None + publishing port when creating the client, which means the client will find it using metadata the server publishes + on `port`. """ from contextlib import closing from msgbus.client import MsgbusSubClient @@ -14,7 +16,10 @@ def send_zmq(host, port, channel, messages): """ Send some messages on a specified channel using a raw zmq socket. Note: the native client connects to the server's publisher port and autodetects the server's subscriber port. The - raw zmq socket must connect directly to the subscriber port + raw zmq socket must connect directly to the subscriber port. The first pub() of the native client will block as it + listens for metadata from the server to establish the underlying zmq publisher socket. if the server is down, the + native client could raise PublishSetupException or block as long as the timeout. The raw zmq socket is allowed to + silently fail, in this example. """ import zmq from time import sleep @@ -41,7 +46,7 @@ def main(): args = parser.parse_args() if args.type == "native": - send_native(args.host, args.port, args.channel, args.message) + send_native(args.host, args.port, None, args.channel, args.message) elif args.type == "raw": send_zmq(args.host, args.port, args.channel, args.message) diff --git a/msgbus/server.py b/msgbus/server.py index ae2021c..e08f3d6 100644 --- a/msgbus/server.py +++ b/msgbus/server.py @@ -127,7 +127,7 @@ class MsgBusServer(object): "pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages", "counter_remote_messages", "conf", "inprogress_connects"] - def __init__(self, loop, ctx, subport, pubport, port_range, peers): + def __init__(self, loop, ctx, pubport, subport, port_range, peers): assert subport != pubport self.alive = True # TODO move this? @@ -343,8 +343,8 @@ class MsgBusServer(object): def main(): import argparse parser = argparse.ArgumentParser(description="msgbus server") - parser.add_argument("-p", "--port", default=7003, help="first listen port (tcp)") - parser.add_argument("-n", "--peers", nargs="+", help="connect to peer 1.2.3.4:5678") + parser.add_argument("-p", "--port", default=7000, help="server publisher port") + parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678") parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range") args = parser.parse_args() diff --git a/msgbus/sub.py b/msgbus/sub.py index 3120a2c..09bee1c 100644 --- a/msgbus/sub.py +++ b/msgbus/sub.py @@ -37,7 +37,7 @@ def main(): import argparse parser = argparse.ArgumentParser(description="dump all messages from msgbus") parser.add_argument("-i", "--host", default="127.0.0.1", help="host to connect to") - parser.add_argument("-p", "--port", default=7003, help="port to connect to") + parser.add_argument("-p", "--port", type=int, help="port to connect to") parser.add_argument("-c", "--channel", nargs="+", help="dump only channels") parser.add_argument("--type", default="native", choices=["native", "raw"], help="client type") args = parser.parse_args()