nats-io / Nats.py
Licence: apache-2.0
Python3 client for NATS.io
Stars: ✭ 384
Programming Languages
Labels
Projects that are alternatives of or similar to Nats.py
Hemera
🔬 Writing reliable & fault-tolerant microservices in Node.js https://hemerajs.github.io/hemera/
Stars: ✭ 773 (+101.3%)
Mutual labels: cloud-native, nats
liftbridge-api
Protobuf definitions for the Liftbridge gRPC API. https://github.com/liftbridge-io/liftbridge
Stars: ✭ 15 (-96.09%)
Mutual labels: nats, cloud-native
Liftbridge
Lightweight, fault-tolerant message streams.
Stars: ✭ 2,175 (+466.41%)
Mutual labels: cloud-native, nats
Nats.go
Golang client for NATS, the cloud native messaging system.
Stars: ✭ 3,690 (+860.94%)
Mutual labels: cloud-native, nats
Unifiedmessagerelay
Group Message Forward Framework (supports QQ Telegram Line Discord)
Stars: ✭ 363 (-5.47%)
Mutual labels: asyncio
Asyncpgsa
A wrapper around asyncpg for use with sqlalchemy
Stars: ✭ 371 (-3.39%)
Mutual labels: asyncio
Midway Faas
🔱 A simple and lightweight serverless framework
Stars: ✭ 363 (-5.47%)
Mutual labels: cloud-native
Kubeedge
Kubernetes Native Edge Computing Framework (project under CNCF)
Stars: ✭ 4,582 (+1093.23%)
Mutual labels: cloud-native
Linstor Server
High Performance Software-Defined Block Storage for container, cloud and virtualisation. Fully integrated with Docker, Kubernetes, Openstack, Proxmox etc.
Stars: ✭ 374 (-2.6%)
Mutual labels: cloud-native
Eventmesh
EventMesh is a dynamic cloud-native eventing infrastruture used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.
Stars: ✭ 356 (-7.29%)
Mutual labels: cloud-native
Picoweb
Really minimal web application framework for the Pycopy project (minimalist Python dialect) and its "uasyncio" async framework
Stars: ✭ 361 (-5.99%)
Mutual labels: asyncio
Lahja
Lahja is a generic multi process event bus implementation written in Python 3.6+ to enable lightweight inter-process communication, based on non-blocking asyncio
Stars: ✭ 374 (-2.6%)
Mutual labels: asyncio
Django Private Chat
Django one-to-one Websocket-based Asyncio-handled chat, developed by Bearle team
Stars: ✭ 376 (-2.08%)
Mutual labels: asyncio
Threatmapper
Identify vulnerabilities in running containers, images, hosts and repositories
Stars: ✭ 361 (-5.99%)
Mutual labels: cloud-native
Aiotasks
A Celery like task manager that distributes Asyncio coroutines
Stars: ✭ 375 (-2.34%)
Mutual labels: asyncio
Buildkit
concurrent, cache-efficient, and Dockerfile-agnostic builder toolkit
Stars: ✭ 4,537 (+1081.51%)
Mutual labels: cloud-native
NATS - Python3 Client for Asyncio
An asyncio Python client for the NATS messaging system.
Supported platforms
Should be compatible with at least Python +3.6.
Installing
pip install asyncio-nats-client
Starting from v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:
pip install asyncio-nats-client[nkeys]
Basic Usage
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
await nc.connect("demo.nats.io:4222", loop=loop)
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sid = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await nc.auto_unsubscribe(sid, 2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
async def help_request(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
await nc.publish(reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sid = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 1 second.
try:
response = await nc.request("help", b'help me', timeout=1)
print("Received response: {message}".format(
message=response.data.decode()))
except ErrTimeout:
print("Request timed out")
# Remove interest in subscription.
await nc.unsubscribe(sid)
# Terminate connection to NATS.
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
Wildcard Subscriptions
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
await nc.connect("nats://127.0.0.1:4222", loop=loop)
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# "*" matches any token, at any level of the subject.
await nc.subscribe("foo.*.baz", cb=message_handler)
await nc.subscribe("foo.bar.*", cb=message_handler)
# ">" matches any length of the tail of a subject, and can only be the last token
# E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
await nc.subscribe("foo.>", cb=message_handler)
# Matches all of the above.
await nc.publish("foo.bar.baz", b'Hello World')
# Gracefully close the connection.
await nc.drain()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
Advanced Usage
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
try:
# Setting explicit list of servers in a cluster.
await nc.connect(servers=["nats://127.0.0.1:4222", "nats://127.0.0.1:4223", "nats://127.0.0.1:4224"], loop=loop)
except ErrNoServers as e:
print(e)
return
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
for i in range(0, 20):
await nc.publish(reply, "i={i}".format(i=i).encode())
await nc.subscribe("help.>", cb=message_handler)
async def request_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Signal the server to stop sending messages after we got 10 already.
await nc.request(
"help.please", b'help', expected=10, cb=request_handler)
try:
# Flush connection to server, returns when all messages have been processed.
# It raises a timeout if roundtrip takes longer than 1 second.
await nc.flush(1)
except ErrTimeout:
print("Flush timeout")
await asyncio.sleep(1, loop=loop)
# Drain gracefully closes the connection, allowing all subscribers to
# handle any pending messages inflight that the server may have sent.
await nc.drain()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
Clustered Usage
import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
# Setup pool of servers from a NATS cluster.
options = {
"servers": [
"nats://user1:[email protected]:4222",
"nats://user2:[email protected]:4223",
"nats://user3:[email protected]:4224",
],
"loop": loop,
}
# Will try to connect to servers in order of configuration,
# by defaults it connect to one in the pool randomly.
options["dont_randomize"] = True
# Optionally set reconnect wait and max reconnect attempts.
# This example means 10 seconds total per backend.
options["max_reconnect_attempts"] = 5
options["reconnect_time_wait"] = 2
async def disconnected_cb():
print("Got disconnected!")
async def reconnected_cb():
# See who we are connected to on reconnect.
print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
# Setup callbacks to be notified on disconnects and reconnects
options["disconnected_cb"] = disconnected_cb
options["reconnected_cb"] = reconnected_cb
async def error_cb(e):
print("There was an error: {}".format(e))
async def closed_cb():
print("Connection is closed")
async def subscribe_handler(msg):
print("Got message: ", msg.subject, msg.reply, msg.data)
# Setup callbacks to be notified when there is an error
# or connection is closed.
options["error_cb"] = error_cb
options["closed_cb"] = closed_cb
try:
await nc.connect(**options)
except ErrNoServers as e:
# Could not connect to any server in the cluster.
print(e)
return
if nc.is_connected:
await nc.subscribe("help.*", cb=subscribe_handler)
max_messages = 1000
start_time = datetime.now()
print("Sending {} messages to NATS...".format(max_messages))
for i in range(0, max_messages):
try:
await nc.publish("help.{}".format(i), b'A')
await nc.flush(0.500)
except ErrConnectionClosed as e:
print("Connection closed prematurely.")
break
except ErrTimeout as e:
print("Timeout occured when publishing msg i={}: {}".format(
i, e))
end_time = datetime.now()
await nc.close()
duration = end_time - start_time
print("Duration: {}".format(duration))
try:
await nc.publish("help", b"hello world")
except ErrConnectionClosed:
print("Can't publish since no longer connected.")
err = nc.last_error
if err is not None:
print("Last Error: {}".format(err))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
TLS
TLS connections can be configured with an ssl context
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx, tls_hostname="localhost")
Setting the scheme to tls
in the connect URL will make the client create a default ssl context automatically:
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run(loop):
nc = NATS()
await nc.connect("tls://demo.nats.io:4443", loop=loop)
Note: If getting SSL certificate errors in OS X, try first installing the certifi
certificate bundle. If using Python 3.7 for example, then run:
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
...
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update complete
Development
To run the tests:
python3 -m pipenv install
python3 -m pytest
License
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.
Note that the project description data, including the texts, logos, images, and/or trademarks,
for each open source project belongs to its rightful owner.
If you wish to add or remove any projects, please contact us at [email protected].