diff --git a/msgbus/server.py b/msgbus/server.py index 6d9bb37..d60163f 100644 --- a/msgbus/server.py +++ b/msgbus/server.py @@ -52,8 +52,6 @@ class MsgBusServerPeer(object): self.confpeer = None - print("New peer on {}, {}".format(self.pub_port, self.sub_port)) - async def run(self): # Wait for messages # On recv, pass it to the server for transmission to all peers but the sender @@ -120,7 +118,7 @@ class MsgBusServer(object): "pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages", "counter_remote_messages", "conf", "inprogress_connects", "bind_host"] - def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers): + def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None): assert subport != pubport self.alive = True # TODO move this? @@ -139,7 +137,7 @@ class MsgBusServer(object): self.seed_peers = peers self.peers = {} - self.name = "randomname_{}".format(randint(0, 420000)) + self.name = name if name else"randomname_{}".format(randint(0, 420000)) self.port_range = port_range @@ -178,7 +176,7 @@ class MsgBusServer(object): _interval = round(time() - last, 2) counter_local_total += self.counter_local_messages counter_remote_total += self.counter_remote_messages - print("Last {}s i delivered {} messages locally ({}/s)" + print("\nLast {}s i delivered {} messages locally ({}/s)" .format(_interval, self.counter_local_messages, round(self.counter_local_messages / _interval, 2))) print("Last {}s i delivered {} messages remotely ({}/s)" @@ -297,7 +295,6 @@ class MsgBusServer(object): self.counter_local_messages += 1 async def process_meta(self, data): - print("Got meta: {}".format(data)) command, rest = data.split(" ", 1) if command == "__peer_request": await self.handle_peer_request(rest) @@ -316,6 +313,7 @@ class MsgBusServer(object): if sub_port is None: sub_port = pub_port + 1 peer_ports = (sub_port, pub_port) + print("New peer '{}' on P:{} S:{}".format(peer_name, pub_port, sub_port)) self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind) self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork return self.peers[peer_name] @@ -339,6 +337,7 @@ def main(): parser.add_argument("-b", "--bind-host", default="0.0.0.0", help="bind host") parser.add_argument("-p", "--port", default=7000, help="server publisher port") parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678") + parser.add_argument("--name", help="set node name") parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range") args = parser.parse_args() @@ -346,7 +345,8 @@ def main(): loop = asyncio.get_event_loop() # loop.set_debug(True) server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1, - port_range=args.port_range, peers=args.peers if args.peers else []) + port_range=args.port_range, peers=args.peers if args.peers else [], + name=args.name) def signal_handler(signum, stack): nonlocal loop