All Projects → i-e-b → Diskqueue

i-e-b / Diskqueue

Licence: other
A thread-safe, multi-process(ish) persistent queue library for .Net and Mono

Projects that are alternatives of or similar to Diskqueue

openmessaging.github.io
OpenMessaging homepage
Stars: ✭ 12 (-81.82%)
Mutual labels:  queue, transaction
Crossplatformdisktest
Windows, macOS and Android storage (HDD, SSD, RAM) speed testing/performance benchmarking app
Stars: ✭ 123 (+86.36%)
Mutual labels:  disk, mono
Mainthreadguard
💂 Tracking UIKit access on main thread
Stars: ✭ 53 (-19.7%)
Mutual labels:  thread, queue
Sc
Common libraries and data structures for C.
Stars: ✭ 161 (+143.94%)
Mutual labels:  thread, queue
spinach
Modern Redis task queue for Python 3
Stars: ✭ 46 (-30.3%)
Mutual labels:  queue, thread
youtube-dl-nas
youtube download queue websocket server with login for private NAS.
Stars: ✭ 136 (+106.06%)
Mutual labels:  queue, thread
Specification
OpenMessaging Specification
Stars: ✭ 242 (+266.67%)
Mutual labels:  transaction, queue
k2hash
K2HASH - NoSQL Key Value Store(KVS) library
Stars: ✭ 33 (-50%)
Mutual labels:  queue, transaction
Openmessaging Java
OpenMessaging Runtime Interface for Java
Stars: ✭ 685 (+937.88%)
Mutual labels:  transaction, queue
Queue
PHP bindings for Tarantool Queue.
Stars: ✭ 55 (-16.67%)
Mutual labels:  queue
Qutee
PHP Background Jobs (Tasks) Manager
Stars: ✭ 63 (-4.55%)
Mutual labels:  queue
Ansible Manage Lvm
Stars: ✭ 55 (-16.67%)
Mutual labels:  disk
Cdcontainers
Library of data containers and data structures for C programming language.
Stars: ✭ 57 (-13.64%)
Mutual labels:  queue
Buckets Js
A complete, fully tested and documented data structure library written in pure JavaScript.
Stars: ✭ 1,128 (+1609.09%)
Mutual labels:  queue
Oscpy
An efficient OSC implementation compatible with python2.7 and 3.5+
Stars: ✭ 65 (-1.52%)
Mutual labels:  thread
Geeksforgeeks Dsa 2
This repository contains all the assignments and practice questions solved during the Data Structures and Algorithms course in C++ taught by the Geeks For Geeks team.
Stars: ✭ 53 (-19.7%)
Mutual labels:  queue
Farmhash.sharp
Port of Google's farmhash algorithm to .NET
Stars: ✭ 52 (-21.21%)
Mutual labels:  mono
Ethjs Provider Signer
A simple web3 standard provider that signs eth_sendTransaction payloads.
Stars: ✭ 65 (-1.52%)
Mutual labels:  transaction
Pelagia
Automatic parallelization (lock-free multithreading thread) tool developed by Surparallel Open Source.Pelagia is embedded key value database that implements a small, fast, high-reliability on ANSI C.
Stars: ✭ 1,132 (+1615.15%)
Mutual labels:  thread
Docker Laravel Queue Worker
A docker image for working with queues being monitored by supervisor as recommended by laravel.
Stars: ✭ 60 (-9.09%)
Mutual labels:  queue

DiskQueue

A thread-safe, multi-process(ish) persistent queue, based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk .

Requirements and Environment

Works on .Net 4+ and Mono 2.10.8+ (3.0.6+ recommended)

Requires access to filesystem storage

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock on the given storage location. On first use, a directory will be created with the required files inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise disposed of properly.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless you call session.Flush(). Data will only be visible between threads once it has been flushed. Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption). If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed. There is a static helper method PersistentQueue.WaitFor("path", TimeSpan...) which will wait to gain access until other processes release the lock or the timeout expires. If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].