76 lines
2.8 KiB
Python
76 lines
2.8 KiB
Python
import asyncio
|
|
import zmq.asyncio
|
|
from time import time
|
|
from contextlib import closing
|
|
# from concurrent.futures import ThreadPoolExecutor
|
|
from random import randint
|
|
zmq.asyncio.install()
|
|
|
|
|
|
async def perftest_sender(ctx, port, num_messages, data):
|
|
with closing(ctx.socket(zmq.PUB)) as pub_sock:
|
|
pub_sock.connect("tcp://127.0.0.1:{}".format(port))
|
|
await asyncio.sleep(5)
|
|
start = time()
|
|
total = num_messages
|
|
while total > 0:
|
|
total -= 1
|
|
await pub_sock.send(data)
|
|
if total % 1000 == 0:
|
|
print("to send:", total)
|
|
duration = round(time() - start, 6)
|
|
print("Send {} messages in {}s".format(num_messages, duration))
|
|
print("{} m/s!".format(round(num_messages / duration, 2)))
|
|
|
|
|
|
async def perftest_recver(ctx, port, num_messages, topic):
|
|
try:
|
|
with closing(ctx.socket(zmq.SUB)) as sub_sock:
|
|
sub_sock.connect("tcp://127.0.0.1:{}".format(port))
|
|
sub_sock.subscribe(topic)
|
|
|
|
start = None
|
|
total = num_messages
|
|
recv_bytes = 0
|
|
print("recver waiting")
|
|
while total > 0:
|
|
message = await sub_sock.recv()
|
|
recv_bytes += len(message)
|
|
if not start: # start timer when we get the first message
|
|
start = time()
|
|
if total % 1000 == 0:
|
|
print("to recv:", total)
|
|
total -= 1
|
|
duration = round(time() - start, 6)
|
|
print("Recv'd {} kbytes in {} messages in {}s".format(round(recv_bytes / 1000), num_messages, duration))
|
|
print("{} m/s!".format(round(num_messages / duration, 2)))
|
|
except:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="msgbus performance test")
|
|
parser.add_argument("-p", "--send-port", default=7003, help="port to send to")
|
|
parser.add_argument("-r", "--recv-port", default=7004, help="port to recv to")
|
|
parser.add_argument("-m", "--messages", type=int, default=10000, help="number of messages to send")
|
|
parser.add_argument("-t", "--topic", default="perftest")
|
|
parser.add_argument("--message", default="message", help="content to send")
|
|
args = parser.parse_args()
|
|
|
|
with zmq.asyncio.Context() as ctx:
|
|
loop = asyncio.get_event_loop()
|
|
# loop.set_default_executor(ThreadPoolExecutor(max_workers=10))
|
|
loop.set_debug(True)
|
|
message = "{} {}".format(args.topic, args.message).encode("utf-8")
|
|
topic = args.topic.encode("utf-8")
|
|
loop.run_until_complete(asyncio.wait([perftest_recver(ctx, args.recv_port, args.messages, topic),
|
|
perftest_sender(ctx, args.send_port, args.messages, message)]))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|