ZeroMQ-based python message broker server/client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

394 lines
17 KiB

import sys
import asyncio
from time import time
import zmq.asyncio
from contextlib import closing
from random import randint
import signal
class MsgBusServerPeer(object):
__slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr",
"pub_sock", "pub_sock_addr", "last_keepalive", "confpeer", "host"]
A link to another msgbus server that we share messages with and get messages from.
Peering process:
- Another node sends, to our sub port, a __peer_request message with a server name
- We send a __peer_response message with two port numbers
- We start listening pub/sub on those two ports
- Anything we receive goes out to all our clients and peers EXCEPT this one
def __init__(self, server, name, host, pub_port, sub_port, bind=True):
self.alive = False
self.server = server = name = host
self.pub_port = pub_port
self.sub_port = sub_port
self.bind = bind
self.protocol = 'tcp'
self.sub_sock = None
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol,, self.sub_port)
self.pub_sock = None
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol,, self.pub_port)
self.last_keepalive = time()
self.confpeer = None
async def run(self):
# Wait for messages
# On recv, pass it to the server for transmission to all peers but the sender
with closing(self.server.ctx.socket(zmq.SUB)) as self.sub_sock:
with closing(self.server.ctx.socket(zmq.PUB)) as self.pub_sock:
if self.bind:
self.alive = True
await asyncio.wait([self.listen(),
self.keep_alive_sender()], loop=self.server.loop)
async def listen(self):
while self.alive:
# __peer_msg <origin> <topic> <data>
# print("peer recv")
msg = await self.sub_sock.recv()
topic, msg = msg.decode('utf-8').split(' ', 1)
# print('peer received topic:"{}" data:"{}"'.format(topic, msg))
if topic == "__peer_msg":
sender, data = msg.split(' ', 1)
self.server.counter_remote_messages += 1
await asyncio.wait([self.server.pub_sock.send(data.encode("utf-8")),
self.server.send_to_peers(data, exclude=sender)],
# print("Sent!")
elif topic == "__peer_keepalive":
self.last_keepalive = time()
async def local_send(self, msg):
# Send a raw message to peers
if self.alive:
await self.pub_sock.send('__peer_msg {} {}'.format(, msg).encode('utf-8'))
async def keep_alive_sender(self):
interval = self.server.conf.get("peer_keepalive_interval", 1)
while self.alive:
await self.pub_sock.send("__peer_keepalive ping".encode('utf-8'))
await asyncio.sleep(interval)
async def keep_alive_listener(self):
interval = self.server.conf.get("peer_keepalive_timeout", 5)
while self.alive:
if time() - self.last_keepalive > interval:
print("Peer '{}' is lost!".format(
await asyncio.sleep(1)
def shutdown(self):
self.alive = False
class MsgBusServer(object):
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock",
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
"counter_remote_messages", "conf", "inprogress_connects", "bind_host", "verbose"]
def __init__(self, loop, ctx, bind_host, pubport, subport, port_range, peers, name=None, verbose=True):
assert subport != pubport
self.verbose = verbose
self.alive = True # TODO move this?
self.loop = loop
self.ctx = ctx
self.bind_host = bind_host
self.subport = subport
self.pubport = pubport
self.protocol = 'tcp'
self.sub_sock = None
self.sub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.subport)
self.pub_sock = None
self.pub_sock_addr = '{}://{}:{}'.format(self.protocol, self.bind_host, self.pubport)
self.seed_peers = peers
self.peers = {} = name if name else"randomname_{}".format(randint(0, 420000))
self.port_range = port_range
self.counter_local_messages = 0
self.counter_remote_messages = 0
self.conf = {}
self.inprogress_connects = []
async def run(self):
with closing(self.ctx.socket(zmq.SUB)) as self.sub_sock:
with closing(self.ctx.socket(zmq.PUB)) as self.pub_sock:
jobs = [self.reciever(),
if self.verbose:
await asyncio.wait(jobs, loop=self.loop)
async def stats_monitor(self):
Print out stats on an interval (messages/s etc)
start = time()
last = start - 1
interval = self.conf.get("stats_interval", 5)
show_idle_stats = self.conf.get("show_idle_stats", True)
counter_local_total = 0
counter_remote_total = 0
while self.alive:
await asyncio.sleep(interval)
if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0:
_interval = round(time() - last, 2)
counter_local_total += self.counter_local_messages
counter_remote_total += self.counter_remote_messages
total = self.counter_local_messages + self.counter_remote_messages
uptime = round(time() - start, 2)
total_life = counter_local_total + counter_remote_total
totals_local = "Total: {}s: {} (L:{} R:{})" \
.format(_interval, total, self.counter_local_messages, self.counter_remote_messages)
totals_lifetime = "Lifetime {}s {} (L:{} R: {})" \
.format(round(uptime), total_life, counter_local_total, counter_remote_total)
tps_interval = "Tps: {}s: {}/s (L:{} R:{})" \
round(total / _interval, 2),
round(self.counter_local_messages / _interval, 2),
round(self.counter_remote_messages / _interval, 2))
tps_lifetime = "Lifetime {}s {} (L:{} R: {})" \
round(total_life / uptime, 2),
round(counter_local_total / uptime, 2),
round(counter_remote_total / uptime, 2))
peer_info = "Peers: total: {}: {}".format(len(self.peers), ', '.join(self.peers.keys()))
print("{: <40} {: <40}\n{: <40} {: <40}\n{}"
.format(totals_local, totals_lifetime, tps_interval, tps_lifetime, peer_info))
self.counter_local_messages = 0
self.counter_remote_messages = 0
last = time()
async def peer_monitor(self):
# ensure we stay connected to peers
while self.alive:
for peer_addr in self.seed_peers:
if not self.has_peer(peer_addr):
print("Connecting to {}".format(peer_addr))
self.loop.call_soon(asyncio.ensure_future, self.connect_to_peer(peer_addr))
await asyncio.sleep(1)
def has_peer(self, peer_name):
if peer_name in self.inprogress_connects:
return True
if peer_name in [i.confpeer for k, i in self.peers.items()]:
return True
return False
async def connect_to_peer(self, peer_name):
host, pub_port = peer_name.split(":")
peer_pub_addr = '{}://{}:{}'.format(self.protocol, host, pub_port)
with closing(self.ctx.socket(zmq.SUB)) as peer_pub_socket:
async def wait_for_cmd(cmd_name, timeout=10):
start = time()
while time() - start < timeout:
msg = await peer_pub_socket.recv(flags=zmq.NOBLOCK)
if msg:
_, cmd, data = msg.decode("utf-8").split(" ", 2) # only subscribed to __msgbus_meta
if cmd == cmd_name:
return data
except zmq.error.Again:
await asyncio.sleep(0.1)
# print("waiting for peer info")
peer_info = await wait_for_cmd("__my_info")
if not peer_info:
print("peering failed for", peer_name)
remote_name, subport, subproto, pubbport, pubproto = peer_info.split()
peer_sub_addr = '{}://{}:{}'.format(subproto, host, subport)
peer_response = []
# print(peer_sub_addr)
with closing(self.ctx.socket(zmq.PUB)) as peer_sub_socket:
await asyncio.sleep(1)
while self.alive:
await peer_sub_socket.send("__msgbus_meta __peer_request {}".format('utf-8'))
peer_response = await wait_for_cmd("__peer_response")
if peer_response:
# print("got peer resp: ", peer_response)
name, _ = peer_response.split(" ", 1)
if name ==
await asyncio.sleep(1)
if not peer_response:
print("peering failed for", peer_name)
_, pubport, pubproto, subport, subproto = peer_response.split()
peer = self.new_peer(remote_name, host, pub_port=pubport, sub_port=subport, bind=False)
peer.confpeer = peer_name
# print("Added peer", peer_name)
async def heartbeat(self):
Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include:
* __my_ports: a listing of this nodes pub and sub ports and protocols
heartbeat = '__msgbus_meta __my_info {} {} {} {} {}'.format(, self.subport, self.protocol,
self.pubport, self.protocol).encode('utf-8')
while self.alive:
await asyncio.sleep(1)
async def reciever(self):
while self.alive:
# print("recv")
msg = await self.sub_sock.recv()
_msg = msg.decode("utf-8")
# print('received topic:"{}" data:"{}"'.format(topic, data))
if _msg[0:13] == "__msgbus_meta":
await self.process_meta(_msg.split(' ', 1)[1])
# scheduling the message to be sent "soon" seems to double the performance at the cost of spamming the
# event pool. Which seems to cause weird timing slowdowns (e.g. sleeps taking longer than expected).
# If raw throughput is favorable to you over precision timing, uncomment these two lines and comment out
# the following await.
# self.loop.call_soon(asyncio.ensure_future, self.pub_sock.send(msg))
# self.loop.call_soon(asyncio.ensure_future, self.send_to_peers(_msg))
await asyncio.wait([self.pub_sock.send(msg), self.send_to_peers(_msg)], loop=self.loop)
self.counter_local_messages += 1
async def process_meta(self, data):
command, rest = data.split(" ", 1)
if command == "__peer_request":
await self.handle_peer_request(rest)
async def handle_peer_request(self, peer_name):
assert type(peer_name) is str
peer = self.new_peer(peer_name, self.bind_host)
await self.pub_sock.send("__msgbus_meta __peer_response {} {} tcp {} tcp"
.format(peer_name, peer.pub_port, peer.sub_port)
def new_peer(self, peer_name, host, pub_port=None, sub_port=None, bind=True):
if peer_name not in self.peers:
if pub_port is None:
pub_port = randint(*self.port_range)
if sub_port is None:
sub_port = pub_port + 1
peer_ports = (sub_port, pub_port)
print("New peer '{}' on {} P:{} S:{}".format(peer_name, host, pub_port, sub_port))
self.peers[peer_name] = MsgBusServerPeer(self, peer_name, host, *peer_ports, bind=bind)
self.loop.call_soon(asyncio.ensure_future, self.peers[peer_name].run()) # seems to act like a fork
return self.peers[peer_name]
def disconnect_peer(self, peer_name):
if peer_name in self.peers:
peer = self.peers[peer_name]
del self.peers[peer_name]
async def send_to_peers(self, msg, exclude=None):
for peer_name, peer in self.peers.items():
if peer_name == exclude:
await peer.local_send(msg)
def main():
import argparse
parser = argparse.ArgumentParser(description="msgbus server")
parser.add_argument("-b", "--bind-host", default="", help="bind host")
parser.add_argument("-p", "--port", default=7000, help="server publisher port")
parser.add_argument("-n", "--peers", nargs="+", help="connect to peer's publisher port")
parser.add_argument("--name", help="set node name")
parser.add_argument("-r", "--port-range", default=[7010, 7400], nargs=2, help="peer port range")
parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logging")
args = parser.parse_args()
with zmq.asyncio.Context() as ctx:
loop = asyncio.get_event_loop()
# loop.set_debug(True)
port_range = (int(args.port_range[0]), int(args.port_range[1]))
server = MsgBusServer(loop, ctx, args.bind_host, int(args.port), int(args.port) + 1,
port_range=port_range, peers=args.peers if args.peers else [],, verbose=args.verbose)
def signal_handler(signum, stack):
nonlocal loop
print('Received:', signum)
server.alive = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == '__main__':
More on peering:
./ -p 7003 # start a node on port 7003 + 7004
./ -p 7200 --peer # connect another node
./ -p 7300 --peer # connect a third node, in a chain
./ -p 7400 --peer # connect a third node, making 7200 a hub
- This server has 2 ports for all normal clients
- peers use this normal port to request peering. The existing server is the REMOTE, the other is CLIENT
- CLIENT: subscribe to the configured host and IP, it is a publishing port
- REMOTE: (periodically) sends and info heartbeat advertising the REMOTE's subscribing port
- CLIENT: waits for ^ msg, connects to REMOTE's sub port
- CLIENT: send a __peer_request, indicating our identity (name)
- REMOTE: (always upon receiving a __peer_request) create a MsgBusServerPeer if one doesnt exist for the requester's
identity. It would start listening now or already be. Send a __peer_response containing the peer's name and ports.
- CLIENT: wait for a __peer_response. retry or panic if doesnt come X seconds after the request. Create a
MsgBusServerPeer object (probably manually checking for dupe this time).
- Creates a pub/sub port pair for the peer
- In parallel:
- Listen for incoming messages from peers
- Upon recv, forward to all peers (besides the sender) and local clients
- When local clients send msgs, send to all peers