set server name
This commit is contained in:
parent
b0e329aa63
commit
ee9871a456
|
@ -52,8 +52,6 @@ class MsgBusServerPeer(object):
|
||||||
|
|
||||||
self.confpeer = None
|
self.confpeer = None
|
||||||
|
|
||||||
print("New peer on {}, {}".format(self.pub_port, self.sub_port))
|
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
# Wait for messages
|
# Wait for messages
|
||||||
# On recv, pass it to the server for transmission to all peers but the sender
|
# On recv, pass it to the server for transmission to all peers but the sender
|
||||||
|
@ -120,7 +118,7 @@ class MsgBusServer(object):
|
||||||
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
|
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
|
||||||
"counter_remote_messages", "conf", "inprogress_connects", "bind_host"]
|
"counter_remote_messages", "conf", "inprogress_connects", "bind_host"]
|
||||||
|
|
||||||
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers):
|
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None):
|
||||||
assert subport != pubport
|
assert subport != pubport
|
||||||
self.alive = True # TODO move this?
|
self.alive = True # TODO move this?
|
||||||
|
|
||||||
|
@ -139,7 +137,7 @@ class MsgBusServer(object):
|
||||||
self.seed_peers = peers
|
self.seed_peers = peers
|
||||||
self.peers = {}
|
self.peers = {}
|
||||||
|
|
||||||
self.name = "randomname_{}".format(randint(0, 420000))
|
self.name = name if name else"randomname_{}".format(randint(0, 420000))
|
||||||
|
|
||||||
self.port_range = port_range
|
self.port_range = port_range
|
||||||
|
|
||||||
|
@ -178,7 +176,7 @@ class MsgBusServer(object):
|
||||||
_interval = round(time() - last, 2)
|
_interval = round(time() - last, 2)
|
||||||
counter_local_total += self.counter_local_messages
|
counter_local_total += self.counter_local_messages
|
||||||
counter_remote_total += self.counter_remote_messages
|
counter_remote_total += self.counter_remote_messages
|
||||||
print("Last {}s i delivered {} messages locally ({}/s)"
|
print("\nLast {}s i delivered {} messages locally ({}/s)"
|
||||||
.format(_interval, self.counter_local_messages,
|
.format(_interval, self.counter_local_messages,
|
||||||
round(self.counter_local_messages / _interval, 2)))
|
round(self.counter_local_messages / _interval, 2)))
|
||||||
print("Last {}s i delivered {} messages remotely ({}/s)"
|
print("Last {}s i delivered {} messages remotely ({}/s)"
|
||||||
|
@ -297,7 +295,6 @@ class MsgBusServer(object):
|
||||||
self.counter_local_messages += 1
|
self.counter_local_messages += 1
|
||||||
|
|
||||||
async def process_meta(self, data):
|
async def process_meta(self, data):
|
||||||
print("Got meta: {}".format(data))
|
|
||||||
command, rest = data.split(" ", 1)
|
command, rest = data.split(" ", 1)
|
||||||
if command == "__peer_request":
|
if command == "__peer_request":
|
||||||
await self.handle_peer_request(rest)
|
await self.handle_peer_request(rest)
|
||||||
|
@ -316,6 +313,7 @@ class MsgBusServer(object):
|
||||||
if sub_port is None:
|
if sub_port is None:
|
||||||
sub_port = pub_port + 1
|
sub_port = pub_port + 1
|
||||||
peer_ports = (sub_port, pub_port)
|
peer_ports = (sub_port, pub_port)
|
||||||
|
print("New peer '{}' on P:{} S:{}".format(peer_name, pub_port, sub_port))
|
||||||
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind)
|
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind)
|
||||||
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
|
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
|
||||||
return self.peers[peer_name]
|
return self.peers[peer_name]
|
||||||
|
@ -339,6 +337,7 @@ def main():
|
||||||
parser.add_argument("-b", "--bind-host", default="0.0.0.0", help="bind host")
|
parser.add_argument("-b", "--bind-host", default="0.0.0.0", help="bind host")
|
||||||
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
|
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
|
||||||
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
|
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
|
||||||
|
parser.add_argument("--name", help="set node name")
|
||||||
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
|
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
@ -346,7 +345,8 @@ def main():
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
# loop.set_debug(True)
|
# loop.set_debug(True)
|
||||||
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1,
|
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1,
|
||||||
port_range=args.port_range, peers=args.peers if args.peers else [])
|
port_range=args.port_range, peers=args.peers if args.peers else [],
|
||||||
|
name=args.name)
|
||||||
|
|
||||||
def signal_handler(signum, stack):
|
def signal_handler(signum, stack):
|
||||||
nonlocal loop
|
nonlocal loop
|
||||||
|
|
Loading…
Reference in New Issue