All Projects → status-im → nim-taskpools

status-im / nim-taskpools

Licence: other
Lightweight, energy-efficient, easily auditable threadpool

Programming Languages

nim
578 projects
c
50402 projects - #5 most used programming language

Taskpools

This implements a lightweight, energy-efficient, easily auditable multithreaded taskpools.

This taskpools will be used in a highly security-sensitive blockchain application targeted at resource-restricted devices hence desirable properties are:

  • Ease of auditing and maintenance.
    • Formally verified synchronization primitives are highly-sought after.
    • Otherwise primitives are implemented from papers or ported from proven codebases that can serve as reference for auditors.
  • Resource-efficient. Threads spindown to save power, low memory use.
  • Decent performance and scalability. The CPU should spent its time processing user workloads and not dealing with threadpool contention, latencies and overheads.

Example usage

# Demo of API using a very inefficient π approcimation algorithm.

import
  std/[strutils, math, cpuinfo],
  taskpools

# From https://github.com/nim-lang/Nim/blob/v1.6.2/tests/parallel/tpi.nim
# Leibniz Formula https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80
proc term(k: int): float =
  if k mod 2 == 1:
    -4'f / float(2*k + 1)
  else:
    4'f / float(2*k + 1)

proc piApprox(tp: Taskpool, n: int): float =
  var pendingFuts = newSeq[FlowVar[float]](n)
  for k in 0 ..< pendingFuts.len:
    pendingFuts[k] = tp.spawn term(k) # Schedule a task on the threadpool a return a handle to retrieve the result.
  for k in 0 ..< pendingFuts.len:
    result += sync pendingFuts[k]     # Block until the result is available.

proc main() =
  var n = 1_000_000
  var nthreads = countProcessors()

  var tp = Taskpool.new(num_threads = nthreads) # Default to the number of hardware threads.

  echo formatFloat(tp.piApprox(n))

  tp.syncAll()                                  # Block until all pending tasks are processed (implied in tp.shutdown())
  tp.shutdown()

# Compile with nim c -r -d:release --threads:on --outdir:build example.nim
main()

API

The API follows the spec proposed here nim-lang/RFCs#347 (comment)

The following types and procedures are exposed:

  • Taskpool:
    • type Taskpool* = ptr object
        ## A taskpool schedules procedures to be executed in parallel
    • proc new(T: type Taskpool, numThreads = countProcessor()): T
        ## Initialize a threadpool that manages `numThreads` threads.
        ## Default to the number of logical processors available.
    • proc syncAll*(pool: Taskpool) =
        ## Blocks until all pending tasks are completed.
        ##
        ## This MUST only be called from
        ## the root thread that created the taskpool
    • proc shutdown*(tp: var TaskPool) =
        ## Wait until all tasks are completed and then shutdown the taskpool.
        ##
        ## This MUST only be called from
        ## the root scope that created the taskpool.
    • macro spawn*(tp: TaskPool, fnCall: typed): untyped =
        ## Spawns the input function call asynchronously, potentially on another thread of execution.
        ##
        ## If the function calls returns a result, spawn will wrap it in a Flowvar.
        ## You can use `sync` to block the current thread and extract the asynchronous result from the flowvar.
        ## You can use `isReady` to check if result is available and if subsequent
        ## `spawn` returns immediately.
        ##
        ## Tasks are processed approximately in Last-In-First-Out (LIFO) order
      In practice the signature is one of the following
      proc spawn*(tp: TaskPool, fnCall(args) -> T): Flowvar[T]
      proc spawn*(tp: TaskPool, fnCall(args) -> void): void
  • Flowvar, a handle on an asynchronous computation scheduled on the threadpool
    • type Flowvar*[T] = object
        ## A Flowvar is a placeholder for a future result that may be computed in parallel
    • func isSpawned*(fv: Flowvar): bool =
        ## Returns true if a flowvar is spawned
        ## This may be useful for recursive algorithms that
        ## may or may not spawn a flowvar depending on a condition.
        ## This is similar to Option or Maybe types
    • func isReady*[T](fv: Flowvar[T]): bool =
        ## Returns true if the result of a Flowvar is ready.
        ## In that case `sync` will not block.
        ## Otherwise the current will block to help on all the pending tasks
        ## until the Flowvar is ready.
    • proc sync*[T](fv: sink Flowvar[T]): T =
        ## Blocks the current thread until the flowvar is available
        ## and returned.
        ## The thread is not idle and will complete pending tasks.

Non-goals

The following are non-goals:

  • Supporting GC-ed types with Nim default GC (sequences and strings). Using no GC or --gc:arc, --gc:orc or --gc:boehm (any GC that doesn't have thread-local heaps).
  • Having async-awaitable tasks
  • Running on environments without dynamic memory allocation
  • High-Performance Computing specificities (distribution on many machines or GPUs or machines with 200+ cores or multi-sockets)

Comparison with Weave

Compared to Weave, here are the tradeoffs:

  • Taskpools only provide spawn/sync (task parallelism).
    There is no (extremely) optimized parallel for (data parallelism)
    or precise in/out dependencies (events / dataflow parallelism).
  • Weave can handle trillions of small tasks that require only 10µs per task. (Load Balancing overhead)
  • Weave maintains an adaptive memory pool to reduce memory allocation overhead, Taskpools allocations are as-needed. (Scheduler overhead)

License

Licensed and distributed under either of

at your option. This file may not be copied, modified, or distributed except according to those terms.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].