|
|
|
@ -105,10 +105,11 @@ class MsgBusServer(object): |
|
|
|
|
|
|
|
|
|
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock", |
|
|
|
|
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages", |
|
|
|
|
"counter_remote_messages", "conf", "inprogress_connects", "bind_host"] |
|
|
|
|
"counter_remote_messages", "conf", "inprogress_connects", "bind_host", "verbose"] |
|
|
|
|
|
|
|
|
|
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None): |
|
|
|
|
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None, verbose=True): |
|
|
|
|
assert subport != pubport |
|
|
|
|
self.verbose = verbose |
|
|
|
|
self.alive = True # TODO move this? |
|
|
|
|
|
|
|
|
|
self.loop = loop |
|
|
|
@ -144,10 +145,12 @@ class MsgBusServer(object): |
|
|
|
|
self.sub_sock.bind(self.sub_sock_addr) |
|
|
|
|
self.sub_sock.subscribe(b'') |
|
|
|
|
self.pub_sock.bind(self.pub_sock_addr) |
|
|
|
|
await asyncio.wait([self.reciever(), |
|
|
|
|
self.heartbeat(), |
|
|
|
|
self.peer_monitor(), |
|
|
|
|
self.stats_monitor()], loop=self.loop) |
|
|
|
|
jobs = [self.reciever(), |
|
|
|
|
self.heartbeat(), |
|
|
|
|
self.peer_monitor()] |
|
|
|
|
if self.verbose: |
|
|
|
|
jobs.append(self.stats_monitor()) |
|
|
|
|
await asyncio.wait(jobs, loop=self.loop) |
|
|
|
|
|
|
|
|
|
async def stats_monitor(self): |
|
|
|
|
""" |
|
|
|
@ -335,6 +338,7 @@ def main(): |
|
|
|
|
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678") |
|
|
|
|
parser.add_argument("--name", help="set node name") |
|
|
|
|
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range") |
|
|
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logging") |
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
with zmq.asyncio.Context() as ctx: |
|
|
|
@ -343,7 +347,7 @@ def main(): |
|
|
|
|
port_range = (int(args.port_range[0]), int(args.port_range[1])) |
|
|
|
|
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1, |
|
|
|
|
port_range=port_range, peers=args.peers if args.peers else [], |
|
|
|
|
name=args.name) |
|
|
|
|
name=args.name, verbose=args.verbose) |
|
|
|
|
|
|
|
|
|
def signal_handler(signum, stack): |
|
|
|
|
nonlocal loop |
|
|
|
|