From 9fec6097150a166fa5fc2bf9de34dbcbc7fd643f Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 17 Sep 2017 15:13:43 -0700 Subject: [PATCH] add readme and examples --- README.md | 64 +++++++++++++++++++++++++++++++++++ examples/perftest.py | 75 ++++++++++++++++++++++++++++++++++++++++++ examples/testclient.py | 16 +++++++++ msgbus/client.py | 1 - 4 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 README.md create mode 100644 examples/perftest.py create mode 100644 examples/testclient.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..217e181 --- /dev/null +++ b/README.md @@ -0,0 +1,64 @@ +pymsgbus +======== +**Simple zeromq-based pubsub client and server** + + +Quick Start +----------- + +* Install: `python3 setup.py install` +* Run server: `mbusd -p 7100` +* Connect a peer server: `mbusd -p 7200 --peer 127.0.0.1:7100` +* Dump events: `mbussub -p 7100 --channel orderbook` +* Send events: `mbuspub -p 7200 -c orderbook -m foo bar baz` + + +About +----- + +Pymsgbus is server/client software for passing messages in a +[pub-sub](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern. It is built on top of +[pyzmq](http://pyzmq.readthedocs.io/en/latest/). This means you can use it with the native pymsgbus client or raw zeromq +sockets. See `msgbus/pub.py` and `msgbus/sub.py` for examples. + + +Examples +-------- + +Given the two peered servers created in Quick Start above, and these imports: + + +```python +from contextlib import closing +from msgbus.client import MsgbusSubClient +``` + +This code would print out messages from the "orderbook" channel: + +```python +with closing(MsgbusSubClient(host, port)) as client: + client.sub("orderbook") + while True: + channel, message = client.recv() + print("On channel '{}' I got '{}'".format(channel, message)) +``` + +And this code would send a message on the orderbook channel: + +``` +with closing(MsgbusSubClient(host, port)) as client: + for message in messages: + client.pub(channel, message) +``` + +Because pymsgbus is built on [ZeroMQ](http://zeromq.org/), you don't have to start the clients or servers in any +specific order. You can start the lister client above and then the servers, and ZeroMQ will handle sorting things out. +Of course, messages published when there's no server or no listeners are silently list. + + +TODO +---- + +* Remove many 127.0.0.1 hardcodings +* Improve performance +* Create docs diff --git a/examples/perftest.py b/examples/perftest.py new file mode 100644 index 0000000..bf8a542 --- /dev/null +++ b/examples/perftest.py @@ -0,0 +1,75 @@ +import asyncio +import zmq.asyncio +from time import time +from contextlib import closing +# from concurrent.futures import ThreadPoolExecutor +from random import randint +zmq.asyncio.install() + + +async def perftest_sender(ctx, port, num_messages, data): + with closing(ctx.socket(zmq.PUB)) as pub_sock: + pub_sock.connect("tcp://127.0.0.1:{}".format(port)) + await asyncio.sleep(5) + start = time() + total = num_messages + while total > 0: + total -= 1 + await pub_sock.send(data) + if total % 1000 == 0: + print("to send:", total) + duration = round(time() - start, 6) + print("Send {} messages in {}s".format(num_messages, duration)) + print("{} m/s!".format(round(num_messages / duration, 2))) + + +async def perftest_recver(ctx, port, num_messages, topic): + try: + with closing(ctx.socket(zmq.SUB)) as sub_sock: + sub_sock.connect("tcp://127.0.0.1:{}".format(port)) + sub_sock.subscribe(topic) + + start = None + total = num_messages + recv_bytes = 0 + print("recver waiting") + while total > 0: + message = await sub_sock.recv() + recv_bytes += len(message) + if not start: # start timer when we get the first message + start = time() + if total % 1000 == 0: + print("to recv:", total) + total -= 1 + duration = round(time() - start, 6) + print("Recv'd {} kbytes in {} messages in {}s".format(round(recv_bytes / 1000), num_messages, duration)) + print("{} m/s!".format(round(num_messages / duration, 2))) + except: + import traceback + traceback.print_exc() + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="msgbus performance test") + parser.add_argument("-p", "--send-port", default=7003, help="port to send to") + parser.add_argument("-r", "--recv-port", default=7004, help="port to recv to") + parser.add_argument("-m", "--messages", type=int, default=10000, help="number of messages to send") + parser.add_argument("-t", "--topic", default="perftest") + parser.add_argument("--message", default="message", help="content to send") + args = parser.parse_args() + + with zmq.asyncio.Context() as ctx: + loop = asyncio.get_event_loop() + # loop.set_default_executor(ThreadPoolExecutor(max_workers=10)) + loop.set_debug(True) + message = "{} {}".format(args.topic, args.message).encode("utf-8") + topic = args.topic.encode("utf-8") + loop.run_until_complete(asyncio.wait([perftest_recver(ctx, args.recv_port, args.messages, topic), + perftest_sender(ctx, args.send_port, args.messages, message)])) + + +if __name__ == '__main__': + main() + + diff --git a/examples/testclient.py b/examples/testclient.py new file mode 100644 index 0000000..5d7f7ad --- /dev/null +++ b/examples/testclient.py @@ -0,0 +1,16 @@ +from msgbus.client import MsgbusSubClient + + +def main(): + c = MsgbusSubClient("127.0.0.1", 7200) + + c.sub("orderbook") + + print(c.recv()) # returns (channel, message) + + c.pub("hello_world", "asdf1234") + c.pub("hello_world", "qwer5678") + + +if __name__ == '__main__': + main() diff --git a/msgbus/client.py b/msgbus/client.py index e1311f1..b4f8126 100644 --- a/msgbus/client.py +++ b/msgbus/client.py @@ -54,7 +54,6 @@ class MsgbusSubClient(object): assert type(channel) is str if not self.sub_socket: self.connect_sub(self.host, self.port) - print("subbin to", channel) self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8")) self.subscriptions.append(channel)