pymsgbus/msgbus/server.py

395 lines
17 KiB
Python
Raw Permalink Normal View History

2017-09-17 13:03:25 -07:00
import sys
2017-09-05 00:22:24 -07:00
import asyncio
from time import time
import zmq.asyncio
from contextlib import closing
from random import randint
2017-09-17 13:03:25 -07:00
import signal
2017-09-05 00:22:24 -07:00
zmq.asyncio.install()
class MsgBusServerPeer(object):
2017-09-17 13:03:25 -07:00
__slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr",
2017-09-17 16:53:23 -07:00
"pub_sock", "pub_sock_addr", "last_keepalive", "confpeer", "host"]
2017-09-17 13:03:25 -07:00
2017-09-05 00:22:24 -07:00
"""
A link to another msgbus server that we share messages with and get messages from.
Peering process:
- Another node sends, to our sub port, a __peer_request message with a server name
- We send a __peer_response message with two port numbers
- We start listening pub/sub on those two ports
- Anything we receive goes out to all our clients and peers EXCEPT this one
"""
2017-09-17 16:53:23 -07:00
def __init__(self, server, name, host, pub_port, sub_port, bind=True):
2017-09-05 00:22:24 -07:00
self.alive = False
self.server = server
self.name = name
2017-09-17 16:53:23 -07:00
self.host = host
2017-09-05 00:22:24 -07:00
self.pub_port = pub_port
self.sub_port = sub_port
self.bind = bind
self.protocol = 'tcp'
self.sub_sock = None
2017-09-17 16:53:23 -07:00
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol, self.host, self.sub_port)
2017-09-05 00:22:24 -07:00
self.pub_sock = None
2017-09-17 16:53:23 -07:00
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol, self.host, self.pub_port)
2017-09-05 00:22:24 -07:00
self.last_keepalive = time()
self.confpeer = None
async def run(self):
# Wait for messages
# On recv, pass it to the server for transmission to all peers but the sender
with closing(self.server.ctx.socket(zmq.SUB)) as self.sub_sock:
with closing(self.server.ctx.socket(zmq.PUB)) as self.pub_sock:
if self.bind:
self.pub_sock.bind(self.pub_sock_addr)
self.sub_sock.bind(self.sub_sock_addr)
else:
self.pub_sock.connect(self.pub_sock_addr)
self.sub_sock.connect(self.sub_sock_addr)
self.sub_sock.subscribe(b'')
self.alive = True
await asyncio.wait([self.listen(),
self.keep_alive_listener(),
self.keep_alive_sender()], loop=self.server.loop)
async def listen(self):
while self.alive:
# __peer_msg <origin> <topic> <data>
# print("peer recv")
msg = await self.sub_sock.recv()
topic, msg = msg.decode('utf-8').split(' ', 1)
# print('peer received topic:"{}" data:"{}"'.format(topic, msg))
if topic == "__peer_msg":
sender, data = msg.split(' ', 1)
self.server.counter_remote_messages += 1
await asyncio.wait([self.server.pub_sock.send(data.encode("utf-8")),
self.server.send_to_peers(data, exclude=sender)],
loop=self.server.loop)
# print("Sent!")
elif topic == "__peer_keepalive":
self.last_keepalive = time()
async def local_send(self, msg):
# Send a raw message to peers
if self.alive:
await self.pub_sock.send('__peer_msg {} {}'.format(self.server.name, msg).encode('utf-8'))
async def keep_alive_sender(self):
interval = self.server.conf.get("peer_keepalive_interval", 1)
2017-09-17 18:09:20 -07:00
while self.alive:
await self.pub_sock.send("__peer_keepalive ping".encode('utf-8'))
await asyncio.sleep(interval)
2017-09-05 00:22:24 -07:00
async def keep_alive_listener(self):
interval = self.server.conf.get("peer_keepalive_timeout", 5)
2017-09-17 18:09:20 -07:00
while self.alive:
if time() - self.last_keepalive > interval:
2017-09-17 18:34:42 -07:00
print("Peer '{}' is lost!".format(self.name))
2017-09-17 18:09:20 -07:00
self.server.disconnect_peer(self.name)
break
await asyncio.sleep(1)
2017-09-05 00:22:24 -07:00
def shutdown(self):
self.alive = False
self.sub_sock.close()
self.pub_sock.close()
class MsgBusServer(object):
2017-09-17 13:03:25 -07:00
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock",
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
2018-03-27 19:16:08 -07:00
"counter_remote_messages", "conf", "inprogress_connects", "bind_host", "verbose"]
2017-09-17 13:03:25 -07:00
2018-03-27 19:16:08 -07:00
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None, verbose=True):
2017-09-05 00:22:24 -07:00
assert subport != pubport
2018-03-27 19:16:08 -07:00
self.verbose = verbose
2017-09-05 00:22:24 -07:00
self.alive = True # TODO move this?
self.loop = loop
self.ctx = ctx
2017-09-17 16:53:23 -07:00
self.bind_host = bind_host
2017-09-05 00:22:24 -07:00
self.subport = subport
self.pubport = pubport
self.protocol = 'tcp'
self.sub_sock = None
2017-09-17 16:53:23 -07:00
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.subport)
2017-09-05 00:22:24 -07:00
self.pub_sock = None
2017-09-17 16:53:23 -07:00
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.pubport)
2017-09-05 00:22:24 -07:00
self.seed_peers = peers
self.peers = {}
2017-09-17 18:32:34 -07:00
self.name = name if name else"randomname_{}".format(randint(0, 420000))
2017-09-05 00:22:24 -07:00
self.port_range = port_range
self.counter_local_messages = 0
self.counter_remote_messages = 0
self.conf = {}
self.inprogress_connects = []
async def run(self):
with closing(self.ctx.socket(zmq.SUB)) as self.sub_sock:
with closing(self.ctx.socket(zmq.PUB)) as self.pub_sock:
self.sub_sock.bind(self.sub_sock_addr)
self.sub_sock.subscribe(b'')
self.pub_sock.bind(self.pub_sock_addr)
2018-03-27 19:16:08 -07:00
jobs = [self.reciever(),
self.heartbeat(),
self.peer_monitor()]
if self.verbose:
jobs.append(self.stats_monitor())
await asyncio.wait(jobs, loop=self.loop)
2017-09-05 00:22:24 -07:00
async def stats_monitor(self):
"""
Print out stats on an interval (messages/s etc)
"""
start = time()
2017-09-17 18:09:20 -07:00
last = start - 1
2017-09-05 00:22:24 -07:00
interval = self.conf.get("stats_interval", 5)
show_idle_stats = self.conf.get("show_idle_stats", True)
counter_local_total = 0
counter_remote_total = 0
2017-09-17 13:03:25 -07:00
while self.alive:
2017-09-05 00:22:24 -07:00
await asyncio.sleep(interval)
if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0:
2017-09-17 18:09:20 -07:00
_interval = round(time() - last, 2)
2017-09-05 00:22:24 -07:00
counter_local_total += self.counter_local_messages
counter_remote_total += self.counter_remote_messages
total = self.counter_local_messages + self.counter_remote_messages
uptime = round(time() - start, 2)
2017-09-17 20:28:03 -07:00
total_life = counter_local_total + counter_remote_total
totals_local = "Total: {}s: {} (L:{} R:{})" \
.format(_interval, total, self.counter_local_messages, self.counter_remote_messages)
totals_lifetime = "Lifetime {}s {} (L:{} R: {})" \
.format(round(uptime), total_life, counter_local_total, counter_remote_total)
2017-09-17 21:14:40 -07:00
tps_interval = "Tps: {}s: {}/s (L:{} R:{})" \
2017-09-17 20:28:03 -07:00
.format(_interval,
2017-09-17 21:14:40 -07:00
round(total / _interval, 2),
2017-09-17 20:28:03 -07:00
round(self.counter_local_messages / _interval, 2),
round(self.counter_remote_messages / _interval, 2))
tps_lifetime = "Lifetime {}s {} (L:{} R: {})" \
.format(uptime,
round(total_life / uptime, 2),
round(counter_local_total / uptime, 2),
round(counter_remote_total / uptime, 2))
2017-09-17 21:14:40 -07:00
peer_info = "Peers: total: {}: {}".format(len(self.peers), ', '.join(self.peers.keys()))
print("{: <40} {: <40}\n{: <40} {: <40}\n{}"
.format(totals_local, totals_lifetime, tps_interval, tps_lifetime, peer_info))
2017-09-05 00:22:24 -07:00
self.counter_local_messages = 0
self.counter_remote_messages = 0
2017-09-17 18:09:20 -07:00
last = time()
2017-09-05 00:22:24 -07:00
async def peer_monitor(self):
# ensure we stay connected to peers
while self.alive:
for peer_addr in self.seed_peers:
if not self.has_peer(peer_addr):
2017-09-17 19:11:53 -07:00
print("Connecting to {}".format(peer_addr))
2017-09-05 00:22:24 -07:00
self.loop.call_soon(asyncio.ensure_future, self.connect_to_peer(peer_addr))
await asyncio.sleep(1)
def has_peer(self, peer_name):
if peer_name in self.inprogress_connects:
return True
if peer_name in [i.confpeer for k, i in self.peers.items()]:
return True
return False
async def connect_to_peer(self, peer_name):
try:
self.inprogress_connects.append(peer_name)
host, pub_port = peer_name.split(":")
peer_pub_addr = '{}://{}:{}'.format(self.protocol, host, pub_port)
with closing(self.ctx.socket(zmq.SUB)) as peer_pub_socket:
peer_pub_socket.connect(peer_pub_addr)
peer_pub_socket.subscribe(b'__msgbus_meta')
2017-10-30 17:08:31 -07:00
2017-09-05 00:22:24 -07:00
async def wait_for_cmd(cmd_name, timeout=10):
start = time()
while time() - start < timeout:
try:
msg = await peer_pub_socket.recv(flags=zmq.NOBLOCK)
if msg:
_, cmd, data = msg.decode("utf-8").split(" ", 2) # only subscribed to __msgbus_meta
if cmd == cmd_name:
return data
except zmq.error.Again:
await asyncio.sleep(0.1)
2017-09-17 18:48:33 -07:00
# print("waiting for peer info")
2017-09-05 00:22:24 -07:00
peer_info = await wait_for_cmd("__my_info")
if not peer_info:
print("peering failed for", peer_name)
return
remote_name, subport, subproto, pubbport, pubproto = peer_info.split()
peer_sub_addr = '{}://{}:{}'.format(subproto, host, subport)
peer_response = []
2017-09-17 18:48:33 -07:00
# print(peer_sub_addr)
2017-09-05 00:22:24 -07:00
with closing(self.ctx.socket(zmq.PUB)) as peer_sub_socket:
peer_sub_socket.connect(peer_sub_addr)
await asyncio.sleep(1)
2017-09-17 13:03:25 -07:00
while self.alive:
2017-09-05 00:22:24 -07:00
await peer_sub_socket.send("__msgbus_meta __peer_request {}".format(self.name).encode('utf-8'))
peer_response = await wait_for_cmd("__peer_response")
if peer_response:
2017-09-17 18:34:42 -07:00
# print("got peer resp: ", peer_response)
2017-10-30 17:08:31 -07:00
name, _ = peer_response.split(" ", 1)
2017-09-17 22:09:41 -07:00
if name == self.name:
break
2017-09-05 00:22:24 -07:00
await asyncio.sleep(1)
if not peer_response:
print("peering failed for", peer_name)
return
_, pubport, pubproto, subport, subproto = peer_response.split()
2017-09-17 16:53:23 -07:00
peer = self.new_peer(remote_name, host, pub_port=pubport, sub_port=subport, bind=False)
2017-09-05 00:22:24 -07:00
peer.confpeer = peer_name
2017-09-17 18:48:33 -07:00
# print("Added peer", peer_name)
2017-09-05 00:22:24 -07:00
finally:
self.inprogress_connects.remove(peer_name)
async def heartbeat(self):
"""
Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include:
* __my_ports: a listing of this nodes pub and sub ports and protocols
"""
2017-09-17 18:09:20 -07:00
heartbeat = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol,
self.pubport, self.protocol).encode('utf-8')
2017-09-17 13:03:25 -07:00
while self.alive:
2017-09-17 18:09:20 -07:00
self.pub_sock.send(heartbeat)
2017-09-05 00:22:24 -07:00
await asyncio.sleep(1)
async def reciever(self):
2017-09-17 18:09:20 -07:00
while self.alive:
# print("recv")
msg = await self.sub_sock.recv()
_msg = msg.decode("utf-8")
# print('received topic:"{}" data:"{}"'.format(topic, data))
if _msg[0:13] == "__msgbus_meta":
await self.process_meta(_msg.split(' ', 1)[1])
else:
# scheduling the message to be sent "soon" seems to double the performance at the cost of spamming the
# event pool. Which seems to cause weird timing slowdowns (e.g. sleeps taking longer than expected).
# If raw throughput is favorable to you over precision timing, uncomment these two lines and comment out
# the following await.
# self.loop.call_soon(asyncio.ensure_future, self.pub_sock.send(msg))
# self.loop.call_soon(asyncio.ensure_future, self.send_to_peers(_msg))
await asyncio.wait([self.pub_sock.send(msg), self.send_to_peers(_msg)], loop=self.loop)
self.counter_local_messages += 1
2017-09-05 00:22:24 -07:00
async def process_meta(self, data):
command, rest = data.split(" ", 1)
if command == "__peer_request":
2017-09-17 18:09:20 -07:00
await self.handle_peer_request(rest)
2017-09-05 00:22:24 -07:00
async def handle_peer_request(self, peer_name):
assert type(peer_name) is str
2017-09-17 16:53:23 -07:00
peer = self.new_peer(peer_name, self.bind_host)
2017-09-05 00:22:24 -07:00
await self.pub_sock.send("__msgbus_meta __peer_response {} {} tcp {} tcp"
.format(peer_name, peer.pub_port, peer.sub_port)
.encode("utf-8"))
2017-09-17 16:53:23 -07:00
def new_peer(self, peer_name, host, pub_port=None, sub_port=None, bind=True):
2017-09-05 00:22:24 -07:00
if peer_name not in self.peers:
if pub_port is None:
pub_port = randint(*self.port_range)
if sub_port is None:
sub_port = pub_port + 1
peer_ports = (sub_port, pub_port)
2017-09-17 18:34:42 -07:00
print("New peer '{}' on {} P:{} S:{}".format(peer_name, host, pub_port, sub_port))
2017-09-17 16:53:23 -07:00
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind)
2017-09-05 00:22:24 -07:00
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
return self.peers[peer_name]
def disconnect_peer(self, peer_name):
if peer_name in self.peers:
peer = self.peers[peer_name]
peer.shutdown()
del self.peers[peer_name]
async def send_to_peers(self, msg, exclude=None):
for peer_name, peer in self.peers.items():
if peer_name == exclude:
continue
await peer.local_send(msg)
def main():
import argparse
parser = argparse.ArgumentParser(description="msgbus server")
2017-09-17 16:53:23 -07:00
parser.add_argument("-b", "--bind-host", default="0.0.0.0", help="bind host")
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port 1.2.3.4:5678")
2017-09-17 18:32:34 -07:00
parser.add_argument("--name", help="set node name")
2017-09-05 00:22:24 -07:00
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
2018-03-27 19:16:08 -07:00
parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logging")
2017-09-05 00:22:24 -07:00
args = parser.parse_args()
with zmq.asyncio.Context() as ctx:
loop = asyncio.get_event_loop()
2017-09-17 13:03:25 -07:00
# loop.set_debug(True)
2017-09-17 20:45:57 -07:00
port_range = (int(args.port_range[0]), int(args.port_range[1]))
2017-09-17 16:53:23 -07:00
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1,
2017-09-17 20:45:57 -07:00
port_range=port_range, peers=args.peers if args.peers else [],
2018-03-27 19:16:08 -07:00
name=args.name, verbose=args.verbose)
2017-09-17 13:03:25 -07:00
def signal_handler(signum, stack):
nonlocal loop
print('Received:', signum)
server.alive = False
ctx.destroy()
2017-09-17 16:53:23 -07:00
sys.exit(signum)
2017-09-17 13:03:25 -07:00
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
2017-09-05 00:22:24 -07:00
loop.run_until_complete(server.run())
if __name__ == '__main__':
main()
"""
More on peering:
./server.py -p 7003 # start a node on port 7003 + 7004
./server.py -p 7200 --peer 127.0.0.1:7004 # connect another node
./server.py -p 7300 --peer 127.0.0.1:7201 # connect a third node, in a chain
./server.py -p 7400 --peer 127.0.0.1:7201 # connect a third node, making 7200 a hub
- This server has 2 ports for all normal clients
- peers use this normal port to request peering. The existing server is the REMOTE, the other is CLIENT
- CLIENT: subscribe to the configured host and IP, it is a publishing port
- REMOTE: (periodically) sends and info heartbeat advertising the REMOTE's subscribing port
- CLIENT: waits for ^ msg, connects to REMOTE's sub port
- CLIENT: send a __peer_request, indicating our identity (name)
- REMOTE: (always upon receiving a __peer_request) create a MsgBusServerPeer if one doesnt exist for the requester's
identity. It would start listening now or already be. Send a __peer_response containing the peer's name and ports.
- CLIENT: wait for a __peer_response. retry or panic if doesnt come X seconds after the request. Create a
MsgBusServerPeer object (probably manually checking for dupe this time).
MsgBusServerPeer:
- Creates a pub/sub port pair for the peer
- In parallel:
- Listen for incoming messages from peers
- Upon recv, forward to all peers (besides the sender) and local clients
- When local clients send msgs, send to all peers
"""