add readme and examples
This commit is contained in:
parent
fffba8beb9
commit
9fec609715
|
@ -0,0 +1,64 @@
|
||||||
|
pymsgbus
|
||||||
|
========
|
||||||
|
**Simple zeromq-based pubsub client and server**
|
||||||
|
|
||||||
|
|
||||||
|
Quick Start
|
||||||
|
-----------
|
||||||
|
|
||||||
|
* Install: `python3 setup.py install`
|
||||||
|
* Run server: `mbusd -p 7100`
|
||||||
|
* Connect a peer server: `mbusd -p 7200 --peer 127.0.0.1:7100`
|
||||||
|
* Dump events: `mbussub -p 7100 --channel orderbook`
|
||||||
|
* Send events: `mbuspub -p 7200 -c orderbook -m foo bar baz`
|
||||||
|
|
||||||
|
|
||||||
|
About
|
||||||
|
-----
|
||||||
|
|
||||||
|
Pymsgbus is server/client software for passing messages in a
|
||||||
|
[pub-sub](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern. It is built on top of
|
||||||
|
[pyzmq](http://pyzmq.readthedocs.io/en/latest/). This means you can use it with the native pymsgbus client or raw zeromq
|
||||||
|
sockets. See `msgbus/pub.py` and `msgbus/sub.py` for examples.
|
||||||
|
|
||||||
|
|
||||||
|
Examples
|
||||||
|
--------
|
||||||
|
|
||||||
|
Given the two peered servers created in Quick Start above, and these imports:
|
||||||
|
|
||||||
|
|
||||||
|
```python
|
||||||
|
from contextlib import closing
|
||||||
|
from msgbus.client import MsgbusSubClient
|
||||||
|
```
|
||||||
|
|
||||||
|
This code would print out messages from the "orderbook" channel:
|
||||||
|
|
||||||
|
```python
|
||||||
|
with closing(MsgbusSubClient(host, port)) as client:
|
||||||
|
client.sub("orderbook")
|
||||||
|
while True:
|
||||||
|
channel, message = client.recv()
|
||||||
|
print("On channel '{}' I got '{}'".format(channel, message))
|
||||||
|
```
|
||||||
|
|
||||||
|
And this code would send a message on the orderbook channel:
|
||||||
|
|
||||||
|
```
|
||||||
|
with closing(MsgbusSubClient(host, port)) as client:
|
||||||
|
for message in messages:
|
||||||
|
client.pub(channel, message)
|
||||||
|
```
|
||||||
|
|
||||||
|
Because pymsgbus is built on [ZeroMQ](http://zeromq.org/), you don't have to start the clients or servers in any
|
||||||
|
specific order. You can start the lister client above and then the servers, and ZeroMQ will handle sorting things out.
|
||||||
|
Of course, messages published when there's no server or no listeners are silently list.
|
||||||
|
|
||||||
|
|
||||||
|
TODO
|
||||||
|
----
|
||||||
|
|
||||||
|
* Remove many 127.0.0.1 hardcodings
|
||||||
|
* Improve performance
|
||||||
|
* Create docs
|
|
@ -0,0 +1,75 @@
|
||||||
|
import asyncio
|
||||||
|
import zmq.asyncio
|
||||||
|
from time import time
|
||||||
|
from contextlib import closing
|
||||||
|
# from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from random import randint
|
||||||
|
zmq.asyncio.install()
|
||||||
|
|
||||||
|
|
||||||
|
async def perftest_sender(ctx, port, num_messages, data):
|
||||||
|
with closing(ctx.socket(zmq.PUB)) as pub_sock:
|
||||||
|
pub_sock.connect("tcp://127.0.0.1:{}".format(port))
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
start = time()
|
||||||
|
total = num_messages
|
||||||
|
while total > 0:
|
||||||
|
total -= 1
|
||||||
|
await pub_sock.send(data)
|
||||||
|
if total % 1000 == 0:
|
||||||
|
print("to send:", total)
|
||||||
|
duration = round(time() - start, 6)
|
||||||
|
print("Send {} messages in {}s".format(num_messages, duration))
|
||||||
|
print("{} m/s!".format(round(num_messages / duration, 2)))
|
||||||
|
|
||||||
|
|
||||||
|
async def perftest_recver(ctx, port, num_messages, topic):
|
||||||
|
try:
|
||||||
|
with closing(ctx.socket(zmq.SUB)) as sub_sock:
|
||||||
|
sub_sock.connect("tcp://127.0.0.1:{}".format(port))
|
||||||
|
sub_sock.subscribe(topic)
|
||||||
|
|
||||||
|
start = None
|
||||||
|
total = num_messages
|
||||||
|
recv_bytes = 0
|
||||||
|
print("recver waiting")
|
||||||
|
while total > 0:
|
||||||
|
message = await sub_sock.recv()
|
||||||
|
recv_bytes += len(message)
|
||||||
|
if not start: # start timer when we get the first message
|
||||||
|
start = time()
|
||||||
|
if total % 1000 == 0:
|
||||||
|
print("to recv:", total)
|
||||||
|
total -= 1
|
||||||
|
duration = round(time() - start, 6)
|
||||||
|
print("Recv'd {} kbytes in {} messages in {}s".format(round(recv_bytes / 1000), num_messages, duration))
|
||||||
|
print("{} m/s!".format(round(num_messages / duration, 2)))
|
||||||
|
except:
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser(description="msgbus performance test")
|
||||||
|
parser.add_argument("-p", "--send-port", default=7003, help="port to send to")
|
||||||
|
parser.add_argument("-r", "--recv-port", default=7004, help="port to recv to")
|
||||||
|
parser.add_argument("-m", "--messages", type=int, default=10000, help="number of messages to send")
|
||||||
|
parser.add_argument("-t", "--topic", default="perftest")
|
||||||
|
parser.add_argument("--message", default="message", help="content to send")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with zmq.asyncio.Context() as ctx:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
# loop.set_default_executor(ThreadPoolExecutor(max_workers=10))
|
||||||
|
loop.set_debug(True)
|
||||||
|
message = "{} {}".format(args.topic, args.message).encode("utf-8")
|
||||||
|
topic = args.topic.encode("utf-8")
|
||||||
|
loop.run_until_complete(asyncio.wait([perftest_recver(ctx, args.recv_port, args.messages, topic),
|
||||||
|
perftest_sender(ctx, args.send_port, args.messages, message)]))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
from msgbus.client import MsgbusSubClient
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
c = MsgbusSubClient("127.0.0.1", 7200)
|
||||||
|
|
||||||
|
c.sub("orderbook")
|
||||||
|
|
||||||
|
print(c.recv()) # returns (channel, message)
|
||||||
|
|
||||||
|
c.pub("hello_world", "asdf1234")
|
||||||
|
c.pub("hello_world", "qwer5678")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
|
@ -54,7 +54,6 @@ class MsgbusSubClient(object):
|
||||||
assert type(channel) is str
|
assert type(channel) is str
|
||||||
if not self.sub_socket:
|
if not self.sub_socket:
|
||||||
self.connect_sub(self.host, self.port)
|
self.connect_sub(self.host, self.port)
|
||||||
print("subbin to", channel)
|
|
||||||
self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8"))
|
self.sub_socket.setsockopt(zmq.SUBSCRIBE, channel.encode("utf-8"))
|
||||||
self.subscriptions.append(channel)
|
self.subscriptions.append(channel)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue