|
|
|
@ -1,9 +1,10 @@
|
|
|
|
|
import sys
|
|
|
|
|
import asyncio
|
|
|
|
|
from time import time
|
|
|
|
|
import zmq.asyncio
|
|
|
|
|
from contextlib import closing
|
|
|
|
|
# from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
from random import randint
|
|
|
|
|
import signal
|
|
|
|
|
zmq.asyncio.install()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -19,6 +20,10 @@ def exewrap(func):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MsgBusServerPeer(object):
|
|
|
|
|
|
|
|
|
|
__slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr",
|
|
|
|
|
"pub_sock", "pub_sock_addr", "last_keepalive", "confpeer"]
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
A link to another msgbus server that we share messages with and get messages from.
|
|
|
|
|
|
|
|
|
@ -117,6 +122,11 @@ class MsgBusServerPeer(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MsgBusServer(object):
|
|
|
|
|
|
|
|
|
|
__slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock",
|
|
|
|
|
"pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages",
|
|
|
|
|
"counter_remote_messages", "conf", "inprogress_connects"]
|
|
|
|
|
|
|
|
|
|
def __init__(self, loop, ctx, subport, pubport, port_range, peers):
|
|
|
|
|
assert subport != pubport
|
|
|
|
|
self.alive = True # TODO move this?
|
|
|
|
@ -167,7 +177,7 @@ class MsgBusServer(object):
|
|
|
|
|
show_idle_stats = self.conf.get("show_idle_stats", True)
|
|
|
|
|
counter_local_total = 0
|
|
|
|
|
counter_remote_total = 0
|
|
|
|
|
while True:
|
|
|
|
|
while self.alive:
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0:
|
|
|
|
|
counter_local_total += self.counter_local_messages
|
|
|
|
@ -240,7 +250,7 @@ class MsgBusServer(object):
|
|
|
|
|
with closing(self.ctx.socket(zmq.PUB)) as peer_sub_socket:
|
|
|
|
|
peer_sub_socket.connect(peer_sub_addr)
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
while True:
|
|
|
|
|
while self.alive:
|
|
|
|
|
await peer_sub_socket.send("__msgbus_meta __peer_request {}".format(self.name).encode('utf-8'))
|
|
|
|
|
peer_response = await wait_for_cmd("__peer_response")
|
|
|
|
|
if peer_response:
|
|
|
|
@ -265,7 +275,7 @@ class MsgBusServer(object):
|
|
|
|
|
Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include:
|
|
|
|
|
* __my_ports: a listing of this nodes pub and sub ports and protocols
|
|
|
|
|
"""
|
|
|
|
|
while True:
|
|
|
|
|
while self.alive:
|
|
|
|
|
msg = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol,
|
|
|
|
|
self.pubport, self.protocol)
|
|
|
|
|
self.pub_sock.send(msg.encode('utf-8'))
|
|
|
|
@ -273,7 +283,7 @@ class MsgBusServer(object):
|
|
|
|
|
|
|
|
|
|
async def reciever(self):
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
|
|
|
|
while self.alive:
|
|
|
|
|
# print("recv")
|
|
|
|
|
msg = await self.sub_sock.recv()
|
|
|
|
|
msg = msg.decode("utf-8")
|
|
|
|
@ -340,10 +350,22 @@ def main():
|
|
|
|
|
|
|
|
|
|
with zmq.asyncio.Context() as ctx:
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
# loop.set_default_executor(ThreadPoolExecutor(max_workers=10))
|
|
|
|
|
loop.set_debug(True)
|
|
|
|
|
# loop.set_debug(True)
|
|
|
|
|
server = MsgBusServer(loop, ctx, int(args.port), int(args.port) + 1,
|
|
|
|
|
port_range=args.port_range, peers=args.peers if args.peers else [])
|
|
|
|
|
|
|
|
|
|
def signal_handler(signum, stack):
|
|
|
|
|
nonlocal loop
|
|
|
|
|
print('Received:', signum)
|
|
|
|
|
server.alive = False
|
|
|
|
|
ctx.destroy()
|
|
|
|
|
# from time import sleep
|
|
|
|
|
# sleep(1)
|
|
|
|
|
# sys.exit(signum)
|
|
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
|
|
|
|
loop.run_until_complete(server.run())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|