From ba0e9810a840e1fa0d24c8fb6868a527896f23be Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 17 Sep 2017 13:03:25 -0700 Subject: [PATCH] ctrl-c handling --- server.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/server.py b/server.py index a8140bc..ae2021c 100644 --- a/server.py +++ b/server.py @@ -1,9 +1,10 @@ +import sys import asyncio from time import time import zmq.asyncio from contextlib import closing -# from concurrent.futures import ThreadPoolExecutor from random import randint +import signal zmq.asyncio.install() @@ -19,6 +20,10 @@ def exewrap(func): class MsgBusServerPeer(object): + + __slots__ = ["alive", "server", "name", "pub_port", "sub_port", "bind", "protocol", "sub_sock", "sub_sock_addr", + "pub_sock", "pub_sock_addr", "last_keepalive", "confpeer"] + """ A link to another msgbus server that we share messages with and get messages from. @@ -117,6 +122,11 @@ class MsgBusServerPeer(object): class MsgBusServer(object): + + __slots__ = ["alive", "loop", "ctx", "subport", "pubport", "protocol", "sub_sock", "sub_sock_addr", "pub_sock", + "pub_sock_addr", "seed_peers", "peers", "name", "port_range", "counter_local_messages", + "counter_remote_messages", "conf", "inprogress_connects"] + def __init__(self, loop, ctx, subport, pubport, port_range, peers): assert subport != pubport self.alive = True # TODO move this? @@ -167,7 +177,7 @@ class MsgBusServer(object): show_idle_stats = self.conf.get("show_idle_stats", True) counter_local_total = 0 counter_remote_total = 0 - while True: + while self.alive: await asyncio.sleep(interval) if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0: counter_local_total += self.counter_local_messages @@ -240,7 +250,7 @@ class MsgBusServer(object): with closing(self.ctx.socket(zmq.PUB)) as peer_sub_socket: peer_sub_socket.connect(peer_sub_addr) await asyncio.sleep(1) - while True: + while self.alive: await peer_sub_socket.send("__msgbus_meta __peer_request {}".format(self.name).encode('utf-8')) peer_response = await wait_for_cmd("__peer_response") if peer_response: @@ -265,7 +275,7 @@ class MsgBusServer(object): Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include: * __my_ports: a listing of this nodes pub and sub ports and protocols """ - while True: + while self.alive: msg = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol, self.pubport, self.protocol) self.pub_sock.send(msg.encode('utf-8')) @@ -273,7 +283,7 @@ class MsgBusServer(object): async def reciever(self): try: - while True: + while self.alive: # print("recv") msg = await self.sub_sock.recv() msg = msg.decode("utf-8") @@ -340,10 +350,22 @@ def main(): with zmq.asyncio.Context() as ctx: loop = asyncio.get_event_loop() - # loop.set_default_executor(ThreadPoolExecutor(max_workers=10)) - loop.set_debug(True) + # loop.set_debug(True) server = MsgBusServer(loop, ctx, int(args.port), int(args.port) + 1, port_range=args.port_range, peers=args.peers if args.peers else []) + + def signal_handler(signum, stack): + nonlocal loop + print('Received:', signum) + server.alive = False + ctx.destroy() + # from time import sleep + # sleep(1) + # sys.exit(signum) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + loop.run_until_complete(server.run())