Performance tweaks
This commit is contained in:
parent
f0b68ee0b3
commit
b0e329aa63
|
@ -95,26 +95,18 @@ class MsgBusServerPeer(object):
|
|||
|
||||
async def keep_alive_sender(self):
|
||||
interval = self.server.conf.get("peer_keepalive_interval", 1)
|
||||
try:
|
||||
while self.alive:
|
||||
await self.pub_sock.send("__peer_keepalive ping".encode('utf-8'))
|
||||
await asyncio.sleep(interval)
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
while self.alive:
|
||||
await self.pub_sock.send("__peer_keepalive ping".encode('utf-8'))
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
async def keep_alive_listener(self):
|
||||
interval = self.server.conf.get("peer_keepalive_timeout", 5)
|
||||
try:
|
||||
while self.alive:
|
||||
if time() - self.last_keepalive > interval:
|
||||
print("Peer {} is lost!".format(self.name))
|
||||
self.server.disconnect_peer(self.name)
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
while self.alive:
|
||||
if time() - self.last_keepalive > interval:
|
||||
print("Peer {} is lost!".format(self.name))
|
||||
self.server.disconnect_peer(self.name)
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def shutdown(self):
|
||||
self.alive = False
|
||||
|
@ -175,6 +167,7 @@ class MsgBusServer(object):
|
|||
Print out stats on an interval (messages/s etc)
|
||||
"""
|
||||
start = time()
|
||||
last = start - 1
|
||||
interval = self.conf.get("stats_interval", 5)
|
||||
show_idle_stats = self.conf.get("show_idle_stats", True)
|
||||
counter_local_total = 0
|
||||
|
@ -182,14 +175,17 @@ class MsgBusServer(object):
|
|||
while self.alive:
|
||||
await asyncio.sleep(interval)
|
||||
if show_idle_stats or self.counter_local_messages > 0 or self.counter_remote_messages > 0:
|
||||
_interval = round(time() - last, 2)
|
||||
counter_local_total += self.counter_local_messages
|
||||
counter_remote_total += self.counter_remote_messages
|
||||
print("Last {}s i delivered {} messages locally ({}/s)"
|
||||
.format(interval, self.counter_local_messages, round(self.counter_local_messages / interval, 2)))
|
||||
.format(_interval, self.counter_local_messages,
|
||||
round(self.counter_local_messages / _interval, 2)))
|
||||
print("Last {}s i delivered {} messages remotely ({}/s)"
|
||||
.format(interval, self.counter_remote_messages, round(self.counter_remote_messages / interval, 2)))
|
||||
.format(_interval, self.counter_remote_messages,
|
||||
round(self.counter_remote_messages / _interval, 2)))
|
||||
total = self.counter_local_messages + self.counter_remote_messages
|
||||
print("Last {}s total {} messages ({}/s)".format(interval, total, round(total / interval, 2)))
|
||||
print("Last {}s total {} messages ({}/s)".format(_interval, total, round(total / _interval, 2)))
|
||||
uptime = round(time() - start, 2)
|
||||
print("Lifetime {}s i delivered {} messages locally ({}/s)"
|
||||
.format(uptime, counter_local_total, round(counter_local_total / uptime, 2)))
|
||||
|
@ -199,8 +195,8 @@ class MsgBusServer(object):
|
|||
print("Lifetime {}s total {} messages ({}/s)\n".format(uptime, total, round(total / uptime, 2)))
|
||||
self.counter_local_messages = 0
|
||||
self.counter_remote_messages = 0
|
||||
last = time()
|
||||
|
||||
@exewrap
|
||||
async def peer_monitor(self):
|
||||
# ensure we stay connected to peers
|
||||
while self.alive:
|
||||
|
@ -217,7 +213,6 @@ class MsgBusServer(object):
|
|||
return True
|
||||
return False
|
||||
|
||||
@exewrap
|
||||
async def connect_to_peer(self, peer_name):
|
||||
try:
|
||||
self.inprogress_connects.append(peer_name)
|
||||
|
@ -277,39 +272,35 @@ class MsgBusServer(object):
|
|||
Send heartbeat messages every second. These messages, all under the __msgbus_meta topic, include:
|
||||
* __my_ports: a listing of this nodes pub and sub ports and protocols
|
||||
"""
|
||||
heartbeat = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol,
|
||||
self.pubport, self.protocol).encode('utf-8')
|
||||
while self.alive:
|
||||
msg = '__msgbus_meta __my_info {} {} {} {} {}'.format(self.name, self.subport, self.protocol,
|
||||
self.pubport, self.protocol)
|
||||
self.pub_sock.send(msg.encode('utf-8'))
|
||||
self.pub_sock.send(heartbeat)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def reciever(self):
|
||||
try:
|
||||
while self.alive:
|
||||
# print("recv")
|
||||
msg = await self.sub_sock.recv()
|
||||
msg = msg.decode("utf-8")
|
||||
topic, data = msg.split(' ', 1)
|
||||
# print('received topic:"{}" data:"{}"'.format(topic, data))
|
||||
if topic == "__msgbus_meta":
|
||||
await self.process_meta(data)
|
||||
else:
|
||||
await asyncio.wait([self.pub_sock.send(msg.encode("utf-8")), self.send_to_peers(msg)],
|
||||
loop=self.loop)
|
||||
self.counter_local_messages += 1
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
while self.alive:
|
||||
# print("recv")
|
||||
msg = await self.sub_sock.recv()
|
||||
_msg = msg.decode("utf-8")
|
||||
# print('received topic:"{}" data:"{}"'.format(topic, data))
|
||||
if _msg[0:13] == "__msgbus_meta":
|
||||
await self.process_meta(_msg.split(' ', 1)[1])
|
||||
else:
|
||||
# scheduling the message to be sent "soon" seems to double the performance at the cost of spamming the
|
||||
# event pool. Which seems to cause weird timing slowdowns (e.g. sleeps taking longer than expected).
|
||||
# If raw throughput is favorable to you over precision timing, uncomment these two lines and comment out
|
||||
# the following await.
|
||||
# self.loop.call_soon(asyncio.ensure_future, self.pub_sock.send(msg))
|
||||
# self.loop.call_soon(asyncio.ensure_future, self.send_to_peers(_msg))
|
||||
await asyncio.wait([self.pub_sock.send(msg), self.send_to_peers(_msg)], loop=self.loop)
|
||||
self.counter_local_messages += 1
|
||||
|
||||
async def process_meta(self, data):
|
||||
print("Got meta: {}".format(data))
|
||||
command, rest = data.split(" ", 1)
|
||||
if command == "__peer_request":
|
||||
try:
|
||||
await self.handle_peer_request(rest)
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
await self.handle_peer_request(rest)
|
||||
|
||||
async def handle_peer_request(self, peer_name):
|
||||
assert type(peer_name) is str
|
||||
|
|
Loading…
Reference in New Issue