All Projects → sandglass → Sandglass

sandglass / Sandglass

Licence: apache-2.0
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.

Programming Languages

go
31211 projects - #10 most used programming language
Dockerfile
14818 projects

Projects that are alternatives of or similar to Sandglass

Nsq
A realtime distributed messaging platform (forked from https://github.com/nsqio/nsq)
Stars: ✭ 476 (-68.91%)
Mutual labels:  messaging, message-queue, distributed-systems
Vernemq
A distributed MQTT message broker based on Erlang/OTP. Built for high quality & Industrial use cases.
Stars: ✭ 2,628 (+71.65%)
Mutual labels:  messaging, message-queue, distributed
Liftbridge
Lightweight, fault-tolerant message streams.
Stars: ✭ 2,175 (+42.06%)
Mutual labels:  messaging, message-queue, distributed-systems
Foundatio
Pluggable foundation blocks for building distributed apps.
Stars: ✭ 1,365 (-10.84%)
Mutual labels:  messaging, distributed-systems, distributed
Nats Server
High-Performance server for NATS.io, the cloud and edge native messaging system.
Stars: ✭ 10,223 (+567.73%)
Mutual labels:  messaging, message-queue, distributed-systems
Nsq
A realtime distributed messaging platform
Stars: ✭ 20,663 (+1249.64%)
Mutual labels:  messaging, message-queue, distributed-systems
Hazelcast
Open-source distributed computation and storage platform
Stars: ✭ 4,662 (+204.51%)
Mutual labels:  distributed, distributed-systems
Awesome Distributed Systems
Awesome list of distributed systems resources
Stars: ✭ 512 (-66.56%)
Mutual labels:  distributed-systems, distributed
Node
Mysterium Network Node - official implementation of distributed VPN network (dVPN) protocol
Stars: ✭ 681 (-55.52%)
Mutual labels:  distributed-systems, distributed
Awesome Microservices Netcore
💎 A collection of awesome training series, articles, videos, books, courses, sample projects, and tools for Microservices in .NET Core
Stars: ✭ 865 (-43.5%)
Mutual labels:  distributed-systems, distributed
Nebula
Nebula is a powerful framwork for building highly concurrent, distributed, and resilient message-driven applications for C++.
Stars: ✭ 385 (-74.85%)
Mutual labels:  distributed-systems, distributed
Titanoboa
Titanoboa makes complex workflows easy. It is a low-code workflow orchestration platform for JVM - distributed, highly scalable and fault tolerant.
Stars: ✭ 787 (-48.6%)
Mutual labels:  distributed-systems, distributed
Lethean Vpn
Lethean Virtual Private Network (VPN)
Stars: ✭ 29 (-98.11%)
Mutual labels:  distributed-systems, distributed
Jocko
Kafka implemented in Golang with built-in coordination (No ZK dep, single binary install, Cloud Native)
Stars: ✭ 4,445 (+190.33%)
Mutual labels:  messaging, distributed-systems
Odin
A programmable, observable and distributed job orchestration system.
Stars: ✭ 405 (-73.55%)
Mutual labels:  distributed-systems, distributed
Libvineyard
libvineyard: an in-memory immutable data manager.
Stars: ✭ 392 (-74.4%)
Mutual labels:  distributed-systems, distributed
Remit
RabbitMQ-backed microservices supporting RPC, pubsub, automatic service discovery and scaling with no code changes.
Stars: ✭ 24 (-98.43%)
Mutual labels:  messaging, distributed
Storj
Ongoing Storj v3 development. Decentralized cloud object storage that is affordable, easy to use, private, and secure.
Stars: ✭ 1,278 (-16.53%)
Mutual labels:  distributed-systems, distributed
Java Notes
📚 计算机科学基础知识、Java开发、后端/服务端、面试相关 📚 computer-science/Java-development/backend/interview
Stars: ✭ 1,284 (-16.13%)
Mutual labels:  message-queue, distributed
Diplomat
A HTTP Ruby API for Consul
Stars: ✭ 358 (-76.62%)
Mutual labels:  distributed-systems, distributed

Sandglass Logo

Sandglass is a distributed, horizontally scalable, persistent, time ordered message queue. It was developed to support asynchronous tasks and message scheduling which makes it suitable for usage as a task queue.

Release Build Status License

Table of contents

Features

  • Horizontal scalability
  • Highly available
  • Persistent storage
  • Time ordered
  • Multiple consumers per group for a partition
  • Produce message to be consumed in the future
  • Acknowledge/NotAcknowledge each message individualy
  • Automatic redelivery and commit offset tracking
  • Language agnostic

EXPERIMENTAL: This is a prototype of a side project. This should not be used in production in its current form as things may change quickly without notice.

Installation

On MacOS using Homebrew:

brew install celrenheit/taps/sandglass

For other platforms, you can grab binaries here.

Getting started

NOTE: All data will be stored in /tmp/node1. If you wish to change this, copy demo/node1.yaml and modify it accordingly.

First, let's launch sandglass server:

sandglass --config https://raw.githubusercontent.com/sandglass/sandglass/master/demo/node1.yaml --offset_replication_factor 1

In a second terminal window, create a emails topic:

sandctl topics create emails --num_partitions 3 --replication_factor 1

...produce 10,000 messages:

sandctl produce emails '{"dest" : "[email protected]"}' -n 10000

...and consume from the emails topic:

sandctl consume emails

(or if you wish to watch you can use sandctl consume -f emails to see messages coming live)

We are using a single node cluster, this is not recommended for production.

Add a second node to the cluster:

sandglass --config https://raw.githubusercontent.com/sandglass/sandglass/master/demo/node2.yaml

and repeat the same steps described above for another topic and increasing the replication factor to 2.

Motivation

As previously asked (#4), the purpose of this project might not seem clear. In short, there is two goals.

The first is to be able to track each message individually (i.e. not using a single commit offset) to make suitable for asynchronous tasks.

The second is the ability to schedule messages to be consumed in the future. This make it suitable for retries.

Documentation

Documentation is a work in progress and still lacking.

  • API docs
  • Overview - TODO
  • Clients development guides - TODO
  • Documentation the different ways of starting a cluster

Clients

Go

Installation

go get -u github.com/sandglass/sandglass-client/go/sg

Documentation

The documentation is available on godoc.

Usage

Producer
// Let's first create a client by providing adresses of nodes in the sandglass cluster
client, err := sg.NewClient(
    sg.WithAddresses(":7170"),
)
if err != nil {
    panic(err)
}
defer client.Close()

// Now we produce a new message
// Notice the empty string "" in the 3th argument, meaning let sandglass choose a random partition
err := client.Produce(context.Background(), "emails", "", &sgproto.Message{
    Value: []byte("Hello, Sandglass!"),
})
if err != nil {
    panic(err)
}

In order to produce message in the future, you need to specify a custom offset:

inOneHour := time.Now().Add(1 * time.Hour)
gen := sandflake.NewFixedTimeGenerator(inOneHour)

msg := &sgproto.Message{
	Offset: gen.Next(),
	Value:  []byte("Hello"),
}

err := client.ProduceMessage(context.Background(), "emails", "", msg)
if err != nil {
	return err
}

This will produce a message that will be available for consumption in 1h.

Consumer
  1. High-level
// Let's first create a client by providing adresses of nodes in the sandglass cluster
client, err := sg.NewClient(
    sg.WithAddresses(":7170"),
)
if err != nil {
    panic(err)
}
defer client.Close()

mux := sg.NewMux()
mux.SubscribeFunc("emails", func(msg *sgproto.Message) error {
    // handle message
    log.Printf("received: %s\n", string(msg.Value))
    return nil
})

m := &sg.MuxManager{
    Client:               c,
    Mux:                  mux,
    ReFetchSleepDuration: dur,
}

err = m.Start()
if err != nil {
    log.Fatal(err)
}
  1. Low level
// Let's first create a client by providing adresses of nodes in the sandglass cluster
client, err := sg.NewClient(
    sg.WithAddresses(":7170"),
)
if err != nil {
    panic(err)
}
defer client.Close()


// Listing partitions in order to choose one to consume from
partitions, err := client.ListPartitions(context.Background(), topic)
if err != nil {
    panic(err)
}

// we are choosing only one partition for demo purposes
partition := partitions[0]

// Create a new consumer using consumer group emails-sender and consumer name consumer1
consumer := client.NewConsumer(topic, partition, "emails-sender", "consumer1")

// and consume messages
msgCh, err := consumer.Consume(context.Background())
if err != nil {
    panic(err)
}

for msg := range msgCh {
    // do an amazing amount of work
    log.Printf("received: %s\n", string(msg.Value))

    // when we are done, we Acknowledge the message
    // but we can also NotAcknowledge to trigger a redelivery of the message
    if err := consumer.Acknowledge(context.Background(), msg); err!=nil {
        panic(err)
    }
}

Java, Python, Node.js, Ruby

Interested in having client for one the following languages ?

Support is planned but there is no specific schedule. So, if you are interested to quickly have a client in your language, help is welcome!

Check the raw generated code available on https://github.com/sandglass/sandglass-grpc and feel free to submit your through a pull request to https://github.com/sandglass/sandglass-client.

Architecture

General

general architecture illustration

Topic

There is two kinds of topics:

  • Timer:

    • Fixed number of partitions (set up-front, could change)
    • Time ordered using sandflake IDs
    • Can produce messages in the future
  • KV:

    • Fixed number of partitions (set up-front, cannot change)
    • Behaves like a distributed key value store

A topic has a number of partitions. Data is written into a single partition. Either the destination partition is specified by the producer. Otherwise, we fallback to choosing the destination partition using a consistent hashing algorithm.

Each produced message to a partition writes a message to a Write Ahead Log (WAL) and to the View Log (VL). The WAL is used for the replication logic, it is sorted in the order each message was produced. The View Log is used for message consumption, it is mainly sorted by time (please refer to sandflake ids for the exact composition) for a Timer topics and by keys for KV topics.

A message is composed of the following fields:

    index                       <- position in the WAL

    offset                      <- position in the view log for timer topics
    key and clusteringKey       <- position in the view log for key for kv topics (key is used for partitioning)

    value                       <- your payload

Offset Tracking

Sandglass is responsible for maintaining two offsets for each consumer group:

  • Commited: the offset below which all messages have been ACKed
  • Consumed: the last consumed message

When consuming sandglass starts from the last commited until the last consumed message to check the redelivery of messages. And from the last consumed offset until the last produced message to deliver the new messages. These two actions are done in parallel.

Technologies used

Contributing

Want to contribute to Sandglass ? Awesome! Feel free to submit an issue or a pull request.

Here are some ways you can help:

  • Report bugs
  • Your language is not supported ? Feel free to build a client. Trust me, it should not take you long :)
  • Improve code/documentation
  • Propose new features
  • and more...

License

This project is licensed under the Apache License 2.0 available here.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].