All Projects → nats-io → Nats.py

nats-io / Nats.py

Licence: apache-2.0
Python3 client for NATS.io

Programming Languages

python
139335 projects - #7 most used programming language
python3
1442 projects

Projects that are alternatives of or similar to Nats.py

nats-surveyor
NATS Monitoring, Simplified.
Stars: ✭ 150 (-60.94%)
Mutual labels:  nats, cloud-native
Hemera
🔬 Writing reliable & fault-tolerant microservices in Node.js https://hemerajs.github.io/hemera/
Stars: ✭ 773 (+101.3%)
Mutual labels:  cloud-native, nats
liftbridge-api
Protobuf definitions for the Liftbridge gRPC API. https://github.com/liftbridge-io/liftbridge
Stars: ✭ 15 (-96.09%)
Mutual labels:  nats, cloud-native
Liftbridge
Lightweight, fault-tolerant message streams.
Stars: ✭ 2,175 (+466.41%)
Mutual labels:  cloud-native, nats
Nats.go
Golang client for NATS, the cloud native messaging system.
Stars: ✭ 3,690 (+860.94%)
Mutual labels:  cloud-native, nats
Unifiedmessagerelay
Group Message Forward Framework (supports QQ Telegram Line Discord)
Stars: ✭ 363 (-5.47%)
Mutual labels:  asyncio
Asyncpgsa
A wrapper around asyncpg for use with sqlalchemy
Stars: ✭ 371 (-3.39%)
Mutual labels:  asyncio
Midway Faas
🔱 A simple and lightweight serverless framework
Stars: ✭ 363 (-5.47%)
Mutual labels:  cloud-native
Stig
TUI and CLI for the BitTorrent client Transmission
Stars: ✭ 360 (-6.25%)
Mutual labels:  asyncio
Kubeedge
Kubernetes Native Edge Computing Framework (project under CNCF)
Stars: ✭ 4,582 (+1093.23%)
Mutual labels:  cloud-native
Linstor Server
High Performance Software-Defined Block Storage for container, cloud and virtualisation. Fully integrated with Docker, Kubernetes, Openstack, Proxmox etc.
Stars: ✭ 374 (-2.6%)
Mutual labels:  cloud-native
Eventmesh
EventMesh is a dynamic cloud-native eventing infrastruture used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.
Stars: ✭ 356 (-7.29%)
Mutual labels:  cloud-native
Picoweb
Really minimal web application framework for the Pycopy project (minimalist Python dialect) and its "uasyncio" async framework
Stars: ✭ 361 (-5.99%)
Mutual labels:  asyncio
Lahja
Lahja is a generic multi process event bus implementation written in Python 3.6+ to enable lightweight inter-process communication, based on non-blocking asyncio
Stars: ✭ 374 (-2.6%)
Mutual labels:  asyncio
Falco
Cloud Native Runtime Security
Stars: ✭ 4,340 (+1030.21%)
Mutual labels:  cloud-native
Django Private Chat
Django one-to-one Websocket-based Asyncio-handled chat, developed by Bearle team
Stars: ✭ 376 (-2.08%)
Mutual labels:  asyncio
Threatmapper
Identify vulnerabilities in running containers, images, hosts and repositories
Stars: ✭ 361 (-5.99%)
Mutual labels:  cloud-native
Userge
Userge, Durable as a Serge
Stars: ✭ 363 (-5.47%)
Mutual labels:  asyncio
Aiotasks
A Celery like task manager that distributes Asyncio coroutines
Stars: ✭ 375 (-2.34%)
Mutual labels:  asyncio
Buildkit
concurrent, cache-efficient, and Dockerfile-agnostic builder toolkit
Stars: ✭ 4,537 (+1081.51%)
Mutual labels:  cloud-native

NATS - Python3 Client for Asyncio

An asyncio Python client for the NATS messaging system.

pypi Build Status Versions License Apache 2.0

Supported platforms

Should be compatible with at least Python +3.6.

Installing

pip install asyncio-nats-client

Starting from v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:

pip install asyncio-nats-client[nkeys]

Basic Usage

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    await nc.connect("demo.nats.io:4222", loop=loop)

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sid = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await nc.auto_unsubscribe(sid, 2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sid = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 1 second.
    try:
        response = await nc.request("help", b'help me', timeout=1)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except ErrTimeout:
        print("Request timed out")

    # Remove interest in subscription.
    await nc.unsubscribe(sid)

    # Terminate connection to NATS.
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Wildcard Subscriptions

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers


async def run(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # "*" matches any token, at any level of the subject.
    await nc.subscribe("foo.*.baz", cb=message_handler)
    await nc.subscribe("foo.bar.*", cb=message_handler)

    # ">" matches any length of the tail of a subject, and can only be the last token
    # E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
    await nc.subscribe("foo.>", cb=message_handler)

    # Matches all of the above.
    await nc.publish("foo.bar.baz", b'Hello World')

    # Gracefully close the connection.
    await nc.drain()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Advanced Usage

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    try:
        # Setting explicit list of servers in a cluster.
        await nc.connect(servers=["nats://127.0.0.1:4222", "nats://127.0.0.1:4223", "nats://127.0.0.1:4224"], loop=loop)
    except ErrNoServers as e:
        print(e)
        return

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        for i in range(0, 20):
            await nc.publish(reply, "i={i}".format(i=i).encode())

    await nc.subscribe("help.>", cb=message_handler)

    async def request_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Signal the server to stop sending messages after we got 10 already.
    await nc.request(
        "help.please", b'help', expected=10, cb=request_handler)

    try:
        # Flush connection to server, returns when all messages have been processed.
        # It raises a timeout if roundtrip takes longer than 1 second.
        await nc.flush(1)
    except ErrTimeout:
        print("Flush timeout")

    await asyncio.sleep(1, loop=loop)

    # Drain gracefully closes the connection, allowing all subscribers to
    # handle any pending messages inflight that the server may have sent.
    await nc.drain()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Clustered Usage

import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):

    nc = NATS()

    # Setup pool of servers from a NATS cluster.
    options = {
        "servers": [
            "nats://user1:[email protected]:4222",
            "nats://user2:[email protected]:4223",
            "nats://user3:[email protected]:4224",
        ],
        "loop": loop,
    }

    # Will try to connect to servers in order of configuration,
    # by defaults it connect to one in the pool randomly.
    options["dont_randomize"] = True

    # Optionally set reconnect wait and max reconnect attempts.
    # This example means 10 seconds total per backend.
    options["max_reconnect_attempts"] = 5
    options["reconnect_time_wait"] = 2

    async def disconnected_cb():
        print("Got disconnected!")

    async def reconnected_cb():
        # See who we are connected to on reconnect.
        print("Got reconnected to {url}".format(url=nc.connected_url.netloc))

    # Setup callbacks to be notified on disconnects and reconnects
    options["disconnected_cb"] = disconnected_cb
    options["reconnected_cb"] = reconnected_cb

    async def error_cb(e):
        print("There was an error: {}".format(e))

    async def closed_cb():
        print("Connection is closed")

    async def subscribe_handler(msg):
        print("Got message: ", msg.subject, msg.reply, msg.data)

    # Setup callbacks to be notified when there is an error
    # or connection is closed.
    options["error_cb"] = error_cb
    options["closed_cb"] = closed_cb

    try:
        await nc.connect(**options)
    except ErrNoServers as e:
        # Could not connect to any server in the cluster.
        print(e)
        return

    if nc.is_connected:
        await nc.subscribe("help.*", cb=subscribe_handler)

        max_messages = 1000
        start_time = datetime.now()
        print("Sending {} messages to NATS...".format(max_messages))

        for i in range(0, max_messages):
            try:
                await nc.publish("help.{}".format(i), b'A')
                await nc.flush(0.500)
            except ErrConnectionClosed as e:
                print("Connection closed prematurely.")
                break
            except ErrTimeout as e:
                print("Timeout occured when publishing msg i={}: {}".format(
                    i, e))

        end_time = datetime.now()
        await nc.close()
        duration = end_time - start_time
        print("Duration: {}".format(duration))

        try:
            await nc.publish("help", b"hello world")
        except ErrConnectionClosed:
            print("Can't publish since no longer connected.")

    err = nc.last_error
    if err is not None:
        print("Last Error: {}".format(err))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

TLS

TLS connections can be configured with an ssl context

ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx, tls_hostname="localhost")

Setting the scheme to tls in the connect URL will make the client create a default ssl context automatically:

import asyncio
import ssl
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443", loop=loop)

Note: If getting SSL certificate errors in OS X, try first installing the certifi certificate bundle. If using Python 3.7 for example, then run:

$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete

Development

To run the tests:

python3 -m pipenv install
python3 -m pytest 

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].