diff --git a/msgbus/server.py b/msgbus/server.py index ce0fc2e..6d9bb37 100644 --- a/msgbus/server.py +++ b/msgbus/server.py @@ -95,26 +95,18 @@ class MsgBusServerPeer(object): async def keep_alive_sender(self): interval = self.server.conf.get("peer_keepalive_interval", 1) - try: - while self.alive: - await self.pub_sock.send("__peer_keepalive ping".encode('utf-8')) - await asyncio.sleep(interval) - except: - import traceback - traceback.print_exc() + while self.alive: + await self.pub_sock.send("__peer_keepalive ping".encode('utf-8')) + await asyncio.sleep(interval) async def keep_alive_listener(self): interval = self.server.conf.get("peer_keepalive_timeout", 5) - try: - while self.alive: - if time() - self.last_keepalive > interval: - print("Peer {} is lost!".format(self.name)) - self.server.disconnect_peer(self.name) - break - await asyncio.sleep(1) - except: - import traceback - traceback.print_exc() + while self.alive: + if time() - self.last_keepalive > interval: + print("Peer {} is lost!".format(self.name)) + self.server.disconnect_peer(self.name) + break + await asyncio.sleep(1) def shutdown(self): self.alive = False @@ -175,6 +167,7 @@ class MsgBusServer(object): Print out stats on an interval (messages/s etc) """ start = time() + last = start - 1 interval = self.conf.get("stats_interval", 5) show_idle_stats = self.conf.get("show_idle_stats", True) counter_local_total = 0 @@ -182,14 +175,17 @@ class MsgBusServer(object): while self.alive: await asyncio.sleep(interval) if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0: + _interval = round(time() - last, 2) counter_local_total += self.counter_local_messages counter_remote_total += self.counter_remote_messages print("Last {}s i delivered {} messages locally ({}/s)" - .format(interval, self.counter_local_messages, round(self.counter_local_messages / interval, 2))) + .format(_interval, self.counter_local_messages, + round(self.counter_local_messages / _interval, 2))) print("Last {}s i delivered {} messages remotely ({}/s)" - .format(interval, self.counter_remote_messages, round(self.counter_remote_messages / interval, 2))) + .format(_interval, self.counter_remote_messages, + round(self.counter_remote_messages / _interval, 2))) total = self.counter_local_messages + self.counter_remote_messages - print("Last {}s total {} messages ({}/s)".format(interval, total, round(total / interval, 2))) + print("Last {}s total {} messages ({}/s)".format(_interval, total, round(total / _interval, 2))) uptime = round(time() - start, 2) print("Lifetime {}s i delivered {} messages locally ({}/s)" .format(uptime, counter_local_total, round(counter_local_total / uptime, 2))) @@ -199,8 +195,8 @@ class MsgBusServer(object): print("Lifetime {}s total {} messages ({}/s)\n".format(uptime, total, round(total / uptime, 2))) self.counter_local_messages = 0 self.counter_remote_messages = 0 + last = time() - @exewrap async def peer_monitor(self): # ensure we stay connected to peers while self.alive: @@ -217,7 +213,6 @@ class MsgBusServer(object): return True return False - @exewrap async def connect_to_peer(self, peer_name): try: self.inprogress_connects.append(peer_name) @@ -277,39 +272,35 @@ class MsgBusServer(object): Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include: * __my_ports: a listing of this nodes pub and sub ports and protocols """ + heartbeat = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol, + self.pubport, self.protocol).encode('utf-8') while self.alive: - msg = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol, - self.pubport, self.protocol) - self.pub_sock.send(msg.encode('utf-8')) + self.pub_sock.send(heartbeat) await asyncio.sleep(1) async def reciever(self): - try: - while self.alive: - # print("recv") - msg = await self.sub_sock.recv() - msg = msg.decode("utf-8") - topic, data = msg.split(' ', 1) - # print('received topic:"{}" data:"{}"'.format(topic, data)) - if topic == "__msgbus_meta": - await self.process_meta(data) - else: - await asyncio.wait([self.pub_sock.send(msg.encode("utf-8")), self.send_to_peers(msg)], - loop=self.loop) - self.counter_local_messages += 1 - except: - import traceback - traceback.print_exc() + while self.alive: + # print("recv") + msg = await self.sub_sock.recv() + _msg = msg.decode("utf-8") + # print('received topic:"{}" data:"{}"'.format(topic, data)) + if _msg[0:13] == "__msgbus_meta": + await self.process_meta(_msg.split(' ', 1)[1]) + else: + # scheduling the message to be sent "soon" seems to double the performance at the cost of spamming the + # event pool. Which seems to cause weird timing slowdowns (e.g. sleeps taking longer than expected). + # If raw throughput is favorable to you over precision timing, uncomment these two lines and comment out + # the following await. + # self.loop.call_soon(asyncio.ensure_future, self.pub_sock.send(msg)) + # self.loop.call_soon(asyncio.ensure_future, self.send_to_peers(_msg)) + await asyncio.wait([self.pub_sock.send(msg), self.send_to_peers(_msg)], loop=self.loop) + self.counter_local_messages += 1 async def process_meta(self, data): print("Got meta: {}".format(data)) command, rest = data.split(" ", 1) if command == "__peer_request": - try: - await self.handle_peer_request(rest) - except: - import traceback - traceback.print_exc() + await self.handle_peer_request(rest) async def handle_peer_request(self, peer_name): assert type(peer_name) is str