remove 127.0.0.1 hardcodings
This commit is contained in:
parent
658fbe5b3e
commit
f0b68ee0b3
|
@ -59,6 +59,5 @@ Of course, messages published when there's no server or no listeners are silentl
|
||||||
TODO
|
TODO
|
||||||
----
|
----
|
||||||
|
|
||||||
* Remove many 127.0.0.1 hardcodings
|
|
||||||
* Improve performance
|
* Improve performance
|
||||||
* Create docs
|
* Create docs
|
||||||
|
|
|
@ -22,7 +22,7 @@ def exewrap(func):
|
||||||
class MsgBusServerPeer(object):
|
class MsgBusServerPeer(object):
|
||||||
|
|
||||||
__slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr",
|
__slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr",
|
||||||
"pub_sock", "pub_sock_addr", "last_keepalive", "confpeer"]
|
"pub_sock", "pub_sock_addr", "last_keepalive", "confpeer", "host"]
|
||||||
|
|
||||||
"""
|
"""
|
||||||
A link to another msgbus server that we share messages with and get messages from.
|
A link to another msgbus server that we share messages with and get messages from.
|
||||||
|
@ -33,19 +33,20 @@ class MsgBusServerPeer(object):
|
||||||
- We start listening pub/sub on those two ports
|
- We start listening pub/sub on those two ports
|
||||||
- Anything we receive goes out to all our clients and peers EXCEPT this one
|
- Anything we receive goes out to all our clients and peers EXCEPT this one
|
||||||
"""
|
"""
|
||||||
def __init__(self, server, name, pub_port, sub_port, bind=True):
|
def __init__(self, server, name, host, pub_port, sub_port, bind=True):
|
||||||
self.alive = False
|
self.alive = False
|
||||||
self.server = server
|
self.server = server
|
||||||
self.name = name
|
self.name = name
|
||||||
|
self.host = host
|
||||||
self.pub_port = pub_port
|
self.pub_port = pub_port
|
||||||
self.sub_port = sub_port
|
self.sub_port = sub_port
|
||||||
self.bind = bind
|
self.bind = bind
|
||||||
|
|
||||||
self.protocol = 'tcp'
|
self.protocol = 'tcp'
|
||||||
self.sub_sock = None
|
self.sub_sock = None
|
||||||
self.sub_sock_addr = '{}://127.0.0.1:{}'.format(self.protocol, self.sub_port)
|
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol, self.host, self.sub_port)
|
||||||
self.pub_sock = None
|
self.pub_sock = None
|
||||||
self.pub_sock_addr = '{}://127.0.0.1:{}'.format(self.protocol, self.pub_port)
|
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol, self.host, self.pub_port)
|
||||||
|
|
||||||
self.last_keepalive = time()
|
self.last_keepalive = time()
|
||||||
|
|
||||||
|
@ -125,22 +126,23 @@ class MsgBusServer(object):
|
||||||
|
|
||||||
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock",
|
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock",
|
||||||
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
|
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
|
||||||
"counter_remote_messages", "conf", "inprogress_connects"]
|
"counter_remote_messages", "conf", "inprogress_connects", "bind_host"]
|
||||||
|
|
||||||
def __init__(self, loop, ctx, pubport, subport, port_range, peers):
|
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers):
|
||||||
assert subport != pubport
|
assert subport != pubport
|
||||||
self.alive = True # TODO move this?
|
self.alive = True # TODO move this?
|
||||||
|
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.ctx = ctx
|
self.ctx = ctx
|
||||||
|
self.bind_host = bind_host
|
||||||
self.subport = subport
|
self.subport = subport
|
||||||
self.pubport = pubport
|
self.pubport = pubport
|
||||||
|
|
||||||
self.protocol = 'tcp'
|
self.protocol = 'tcp'
|
||||||
self.sub_sock = None
|
self.sub_sock = None
|
||||||
self.sub_sock_addr = '{}://127.0.0.1:{}'.format(self.protocol, self.subport)
|
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.subport)
|
||||||
self.pub_sock = None
|
self.pub_sock = None
|
||||||
self.pub_sock_addr = '{}://127.0.0.1:{}'.format(self.protocol, self.pubport)
|
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.pubport)
|
||||||
|
|
||||||
self.seed_peers = peers
|
self.seed_peers = peers
|
||||||
self.peers = {}
|
self.peers = {}
|
||||||
|
@ -264,7 +266,7 @@ class MsgBusServer(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
_, pubport, pubproto, subport, subproto = peer_response.split()
|
_, pubport, pubproto, subport, subproto = peer_response.split()
|
||||||
peer = self.get_peer(remote_name, pub_port=pubport, sub_port=subport, bind=False)
|
peer = self.new_peer(remote_name, host, pub_port=pubport, sub_port=subport, bind=False)
|
||||||
peer.confpeer = peer_name
|
peer.confpeer = peer_name
|
||||||
print("Added peer", peer_name)
|
print("Added peer", peer_name)
|
||||||
finally:
|
finally:
|
||||||
|
@ -311,19 +313,19 @@ class MsgBusServer(object):
|
||||||
|
|
||||||
async def handle_peer_request(self, peer_name):
|
async def handle_peer_request(self, peer_name):
|
||||||
assert type(peer_name) is str
|
assert type(peer_name) is str
|
||||||
peer = self.get_peer(peer_name)
|
peer = self.new_peer(peer_name, self.bind_host)
|
||||||
await self.pub_sock.send("__msgbus_meta __peer_response {} {} tcp {} tcp"
|
await self.pub_sock.send("__msgbus_meta __peer_response {} {} tcp {} tcp"
|
||||||
.format(peer_name, peer.pub_port, peer.sub_port)
|
.format(peer_name, peer.pub_port, peer.sub_port)
|
||||||
.encode("utf-8"))
|
.encode("utf-8"))
|
||||||
|
|
||||||
def get_peer(self, peer_name, pub_port=None, sub_port=None, bind=True):
|
def new_peer(self, peer_name, host, pub_port=None, sub_port=None, bind=True):
|
||||||
if peer_name not in self.peers:
|
if peer_name not in self.peers:
|
||||||
if pub_port is None:
|
if pub_port is None:
|
||||||
pub_port = randint(*self.port_range)
|
pub_port = randint(*self.port_range)
|
||||||
if sub_port is None:
|
if sub_port is None:
|
||||||
sub_port = pub_port + 1
|
sub_port = pub_port + 1
|
||||||
peer_ports = (sub_port, pub_port)
|
peer_ports = (sub_port, pub_port)
|
||||||
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, *peer_ports, bind=bind)
|
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind)
|
||||||
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
|
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
|
||||||
return self.peers[peer_name]
|
return self.peers[peer_name]
|
||||||
|
|
||||||
|
@ -343,6 +345,7 @@ class MsgBusServer(object):
|
||||||
def main():
|
def main():
|
||||||
import argparse
|
import argparse
|
||||||
parser = argparse.ArgumentParser(description="msgbus server")
|
parser = argparse.ArgumentParser(description="msgbus server")
|
||||||
|
parser.add_argument("-b", "--bind-host", default="0.0.0.0", help="bind host")
|
||||||
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
|
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
|
||||||
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
|
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
|
||||||
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
|
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
|
||||||
|
@ -351,7 +354,7 @@ def main():
|
||||||
with zmq.asyncio.Context() as ctx:
|
with zmq.asyncio.Context() as ctx:
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
# loop.set_debug(True)
|
# loop.set_debug(True)
|
||||||
server = MsgBusServer(loop, ctx, int(args.port), int(args.port) + 1,
|
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1,
|
||||||
port_range=args.port_range, peers=args.peers if args.peers else [])
|
port_range=args.port_range, peers=args.peers if args.peers else [])
|
||||||
|
|
||||||
def signal_handler(signum, stack):
|
def signal_handler(signum, stack):
|
||||||
|
@ -359,9 +362,7 @@ def main():
|
||||||
print('Received:', signum)
|
print('Received:', signum)
|
||||||
server.alive = False
|
server.alive = False
|
||||||
ctx.destroy()
|
ctx.destroy()
|
||||||
# from time import sleep
|
sys.exit(signum)
|
||||||
# sleep(1)
|
|
||||||
# sys.exit(signum)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
signal.signal(signal.SIGTERM, signal_handler)
|
signal.signal(signal.SIGTERM, signal_handler)
|
||||||
|
|
Loading…
Reference in New Issue