All Projects → bittnkr → uniq

bittnkr / uniq

Licence: GPL-3.0 license
A lock-free (multi reader / multi writer) circular buffered queue.

Programming Languages

C++
36643 projects - #6 most used programming language
pascal
1382 projects
javascript
184084 projects - #8 most used programming language
c
50402 projects - #5 most used programming language
C#
18002 projects
HTML
75241 projects

Projects that are alternatives of or similar to uniq

LockFreeHashTable
Lock Free Resizable Hash Table Based On Split-Ordered Lists.
Stars: ✭ 45 (+40.63%)
Mutual labels:  thread, lock-free
Swiftcoroutine
Swift coroutines for iOS, macOS and Linux.
Stars: ✭ 690 (+2056.25%)
Mutual labels:  thread, lock-free
Pelagia
Automatic parallelization (lock-free multithreading thread) tool developed by Surparallel Open Source.Pelagia is embedded key value database that implements a small, fast, high-reliability on ANSI C.
Stars: ✭ 1,132 (+3437.5%)
Mutual labels:  thread, lock-free
lock-free-queue
CN-CppUserGroup-2019-1,lock-free queue demo
Stars: ✭ 58 (+81.25%)
Mutual labels:  lock-free
spinach
Modern Redis task queue for Python 3
Stars: ✭ 46 (+43.75%)
Mutual labels:  thread
gls
goroutine local storage (use context instead if possible)
Stars: ✭ 52 (+62.5%)
Mutual labels:  thread
hatrack
Fast, multi-reader, multi-writer, lockless data structures for parallel programming
Stars: ✭ 55 (+71.88%)
Mutual labels:  lock-free
thread-priority
A simple thread schedule and priority library for rust
Stars: ✭ 48 (+50%)
Mutual labels:  thread
go-worker-thread-pool
A visual working example of a Thread Pool pattern, based on a known blog article.
Stars: ✭ 24 (-25%)
Mutual labels:  thread
mulle-thread
🔠 Cross-platform thread/mutex/tss/atomic operations in C
Stars: ✭ 22 (-31.25%)
Mutual labels:  thread
AsyncKeepAlive
AsyncKeepAlive is a plugin that keep the players' connections alive
Stars: ✭ 13 (-59.37%)
Mutual labels:  thread
optimistic lock coupling rs
🍋: A General Lock following paper "Optimistic Lock Coupling: A Scalable and Efficient General-Purpose Synchronization Method"
Stars: ✭ 21 (-34.37%)
Mutual labels:  lock-free
lock-free
Lock-Free data structures
Stars: ✭ 37 (+15.63%)
Mutual labels:  lock-free
LFTPool
Lock-Free Thread Pool
Stars: ✭ 69 (+115.63%)
Mutual labels:  lock-free
HiFramework.Unity
Based on component to manage project's core logic and module used in unity3d
Stars: ✭ 22 (-31.25%)
Mutual labels:  thread
gdax-orderbook-hpp
An in-memory copy of the order book on the GDAX cryptocurrency exchange, updated in real time via WebSocket feed, exposed in a thread-safe and lock-free data structure.
Stars: ✭ 38 (+18.75%)
Mutual labels:  lock-free
ObviousAwait
🧵 Expressive aliases to ConfigureAwait(true) and ConfigureAwait(false)
Stars: ✭ 55 (+71.88%)
Mutual labels:  thread
matrix multiplication
Parallel Matrix Multiplication Using OpenMP, Phtreads, and MPI
Stars: ✭ 41 (+28.13%)
Mutual labels:  thread
fifo-rs
A first-in-first-out for bytes, like kfifo in Linux.
Stars: ✭ 18 (-43.75%)
Mutual labels:  lock-free
haxe-concurrent
A haxelib for basic platform-agnostic concurrency support
Stars: ✭ 69 (+115.63%)
Mutual labels:  thread

UniQ Multithreading Libray

A multithreading class model build around a lock-free circular buffered queue.

This is a work in progress in its early stages of development, critiques and pull requests are welcome.

For now, we have some tested features:

  • N threads
  • N buffer size (minimum 1 seat).
  • Constant cost per operation O(1).
  • Only 2 atomic variables.
  • Freely preempted.
  • Zero checksum.

Follow a compreensive description of the algorithm.

If you like to put your hands dirt and dive right into de code, start at test.cpp and queue.h. (We have implementations in C#, JavaScript and Pascal too.)

A lock-free circular buffered queue

This paper demonstrate a solution to the 3-thread consensus, implemented as a MRMW (multi-read/multi-write) circular buffer. In the context of a multi-threaded producer/consumer testcase.

For the sake of simplicity, in this docs I'm using a simplified JavaScript pseudocode, familiar for anyone using C-like languages. For the real thing, refer to source code.

The Queue object

class Queue(size) {
  data = Array(size); 
  in = 0, out = 0
  mask = size-1
   
  push(item) // send the item & return the id of the job 
  pop() // get the next item.
}

Properties

data Is a common array (non atomic). Updated without locking. Secured by in & out.

size must be a power of 2 (1, 2, 4, 8, 16...) The buffer will be indexed by binary mask (data[t & mask]) limiting the memory access inside. At minimum, a single bit.

in Holds a the ID of the next element. Always ahead or equal out.

out Is the ID of the last element (next to be removed). Never greater than in

Both are atomic variables, always incremented.

No boundary check is needed, because mask and the way integer overflow happens.

Methods

Here, the heart of the solution, where we solve the 3-thread consensus.

Inserting data into the queue. The push() operation.

push(item) 
{
  let i
  do {
    i = in // get a local copy the next id

    while (i - out == size) sleep() // if full, wait for space
  
  } while ( (data[i & mask]) || (CompareAndSwap(in, i+1, i) != i) )
  // if the seat is lost or CAS failed, try again. 

  data[i & mask] = item // now is safe to update the buffer using local i
  return i // id of the job 
}
  • If the thread is preempted at any point between i = in and CompareAndSwap(input, i+1, i), on return the CAS will fail and the loop go to the next seat. Without any kind of locking.

  • I think that (data[i & mask]) || should not be really neeed, but my computer hangs without it. And it prevents the use of the expensive CAS instruction.

  • Here I check for nullability of the content. But in the C++ implementation it was replaced by an isFree boolean array.

Removing data from the queue. The pop() operation

pop() 
{
  let o
  do {
    o = out // id of the next candidate

    while (o == in) sleep() // if empty, wait for items

  } while ( !(data[o & mask]) || CompareAndSwap(out, o+1, o) != o )
  // if the candidate is gone or CAS failed, try again. 

  o &= mask // round to fit the buffer
  int r = data[o] // save the return
  buffer[o] = 0 // release the seat
  return r
}

Both methods have two nested while() loops:

  • First we get the next seat/candidates with i = in and o = out

  • Then we check if the buffer is full or empty, sleep()ing until state change.

  • If the seat/candidate is available, try increment the atomic register CompareAndSwap(out, o+1, o).

  • If the CAS fail (!=o), go to the next seat.

Notes

  • The load operation used by data[h] = 0 and !(data[o & mask]) are naturaly atomic.

  • The last thing done by pop() is release the seat.

  • The flow happens without any kind of lock.

States

If in == out the Queue is empty,

If in-out == size the Queue is full.

In these cases the queue do not lock, but make a voluntary preemption calling the sleep() function.

Testing - The producer/consumer pattern

Now, with our Queue defined, its time to put it on fire...

Lets create two groups of threads: one producing and another consuming data.

The first type of thread is the producer, it will put a bunch of numbers into the queue Q.

producer() 
{
  for(let i=1; i <= N; i++)
    Q.push(i) 
    
  Q.push(-1) // end of job

  Total += N
  log("Produced:", N)
}

Here we push -1 into the queue to signal the termination of the job. Also can be implemented with external flags.

Now, the work of the consumer is remove elements out of the queue, until receiving a termination.

consumer()
{
  let value, sum = 0

  do { 
    value = Q.pop()
    sum += value
  } while( value != -1 ) 

  Total -= sum 
  log("Consumed:", sum)
}

Producers increment Total and consumers decrement it. At the end, it must be zero, proofing that there is no leaks.

Running the horses

Lets start our threads and check if what we get out the queue its equal to what we pushed into.

let producers = []
  , consumers = []

for(int i=0; i < 4; i++)
{
  producers[i] = Thread(producer)
  consumers[i] = Thread(consumer)
}

wait(producers, consumers)

log("Total: %d", Total)
  • Here we create 8 threads to flow data. For sake of simplicity, with the same number of producers and consumers, but it works equally in asymmetric conditions.
  • I've tested with at most 512 threads on an old Windows XP machine, (where I got the better performace).

This is the output I got from the C++ implementation, flowing 10M items:

Creating 4 producers & 4 consumers
Flowing 10.000.000 items trough the queue.

Produced: 2.500.000
Consumed: 1.349.413
Produced: 2.500.000
Consumed: 1.359.293
Consumed: 3.876.642
Produced: 2.500.000
Consumed: 3.414.652
Produced: 2.500.000

Total: 0

real    0m0,581s

Note that producers always pushed the same amount of items (2.5M), but consumers get different quantities, ranging from 1.3 to 3.8M. This is the expected behaviour.

Follow our expected Total:0 proofing that all produced was consumed.

Then the time took by the operation: 581 ms. A throughput of 17.2 M flow/s. (Enough for an old Dell M6300 Core duo).

I made a series of benchmarks, varying the buffer size and the number of threads. Follow the results.

Benchmarks - Measuring the flow

Throughput x Buffer Size

With default buffer size (64), varying the number of threads:

Throughput x Threads

  • The cost per operation with 2 threads is the same for 512.

Code released under GNU 3.0 and docs under Creative Commons (CC BY-SA 3.0) licenses.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].