Browse Source

swap ports in server so we always refer to one

master
dave 4 years ago
parent
commit
fffba8beb9
  1. 42
      msgbus/client.py
  2. 13
      msgbus/pub.py
  3. 6
      msgbus/server.py
  4. 2
      msgbus/sub.py

42
msgbus/client.py

@ -8,15 +8,16 @@ class PublishSetupException(Exception):
class MsgbusSubClient(object):
def __init__(self, host, port):
def __init__(self, host, port=None, pubport=None):
self.host = host
self.port = port
self.ctx = None # ZMQ context
self.pubport = pubport
self._ctx = None # ZMQ context
self.sub_socket = None # listener sockets
self.subscriptions = []
self.pub_socket = None # publisher socket
self.lock = Semaphore(1)
self.connect()
# self.connect()
def close(self):
if self.sub_socket:
@ -26,16 +27,34 @@ class MsgbusSubClient(object):
if self.ctx:
self.ctx.destroy()
@property
def ctx(self):
if not self._ctx:
self._ctx = zmq.Context()
return self._ctx
def connect(self):
if not self.ctx:
self.ctx = zmq.Context()
if self.port and not self.sub_socket:
self.connect_sub(self.host, self.port)
if self.pubport and not self.pub_socket:
self.connect_pub(self.host, self.pubport)
def connect_sub(self, host, port):
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.connect("tcp://{}:{}".format(self.host, self.port))
self.sub_socket.connect("tcp://{}:{}".format(host, port))
def connect_pub(self, host, port):
pub_socket = self.ctx.socket(zmq.PUB)
pub_socket.connect("tcp://{}:{}".format(host, port))
self.pub_socket = pub_socket
def sub(self, channel=None):
if channel is None:
channel = ''
assert type(channel) is str
if not self.sub_socket:
self.connect_sub(self.host, self.port)
print("subbin to", channel)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8"))
self.subscriptions.append(channel)
@ -66,11 +85,10 @@ class MsgbusSubClient(object):
meta, args = message.split(" ", 1)
if meta != "__my_info":
continue
server_name, subport, subproto, pubbport, pubproto = args.split(" ")
remote = "tcp://{}:{}".format(self.host, subport)
pub_socket = self.ctx.socket(zmq.PUB)
pub_socket.connect(remote)
return pub_socket
server_name, subport, subproto, pubport, pubproto = args.split(" ")
self.pubport = subport
self.connect_pub(self.host, self.pubport)
return
raise PublishSetupException("Could not establish publisher socket")
finally:
self.unsub("__msgbus_meta")
@ -80,7 +98,7 @@ class MsgbusSubClient(object):
message = message.encode("utf-8")
if not self.pub_socket:
with self.lock:
self.pub_socket = self._setup_publish_socket(timeout)
self._setup_publish_socket(timeout)
if settle:
sleep(1)
self.pub_socket.send(channel.encode("utf-8") + b' ' + message)

13
msgbus/pub.py

@ -1,7 +1,9 @@
def send_native(host, port, channel, messages):
def send_native(host, port, pubport, channel, messages):
"""
Send some messages on a specified channel using the msgbus client
Send some messages on a specified channel using the msgbus client. Note that if we don't specify a non-None
publishing port when creating the client, which means the client will find it using metadata the server publishes
on `port`.
"""
from contextlib import closing
from msgbus.client import MsgbusSubClient
@ -14,7 +16,10 @@ def send_zmq(host, port, channel, messages):
"""
Send some messages on a specified channel using a raw zmq socket.
Note: the native client connects to the server's publisher port and autodetects the server's subscriber port. The
raw zmq socket must connect directly to the subscriber port
raw zmq socket must connect directly to the subscriber port. The first pub() of the native client will block as it
listens for metadata from the server to establish the underlying zmq publisher socket. if the server is down, the
native client could raise PublishSetupException or block as long as the timeout. The raw zmq socket is allowed to
silently fail, in this example.
"""
import zmq
from time import sleep
@ -41,7 +46,7 @@ def main():
args = parser.parse_args()
if args.type == "native":
send_native(args.host, args.port, args.channel, args.message)
send_native(args.host, args.port, None, args.channel, args.message)
elif args.type == "raw":
send_zmq(args.host, args.port, args.channel, args.message)

6
msgbus/server.py

@ -127,7 +127,7 @@ class MsgBusServer(object):
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
"counter_remote_messages", "conf", "inprogress_connects"]
def __init__(self, loop, ctx, subport, pubport, port_range, peers):
def __init__(self, loop, ctx, pubport, subport, port_range, peers):
assert subport != pubport
self.alive = True # TODO move this?
@ -343,8 +343,8 @@ class MsgBusServer(object):
def main():
import argparse
parser = argparse.ArgumentParser(description="msgbus server")
parser.add_argument("-p", "--port", default=7003, help="first listen port (tcp)")
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer 1.2.3.4:5678")
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
args = parser.parse_args()

2
msgbus/sub.py

@ -37,7 +37,7 @@ def main():
import argparse
parser = argparse.ArgumentParser(description="dump all messages from msgbus")
parser.add_argument("-i", "--host", default="127.0.0.1", help="host to connect to")
parser.add_argument("-p", "--port", default=7003, help="port to connect to")
parser.add_argument("-p", "--port", type=int, help="port to connect to")
parser.add_argument("-c", "--channel", nargs="+", help="dump only channels")
parser.add_argument("--type", default="native", choices=["native", "raw"], help="client type")
args = parser.parse_args()

Loading…
Cancel
Save