All Projects → alphatwirl → mantichora

alphatwirl / mantichora

Licence: BSD-3-Clause license
A simple interface to Python multiprocessing and threading

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to mantichora

Multitasking
Non-blocking Python methods using decorators
Stars: ✭ 87 (+569.23%)
Mutual labels:  multiprocessing, threading
python-graceful-shutdown
Example of a Python code that implements graceful shutdown while using asyncio, threading and multiprocessing
Stars: ✭ 109 (+738.46%)
Mutual labels:  multiprocessing, threading
think-async
🌿 Exploring cooperative concurrency primitives in Python
Stars: ✭ 178 (+1269.23%)
Mutual labels:  multiprocessing, threading
funboost
pip install funboost,python全功能分布式函数调度框架,。支持python所有类型的并发模式和全球一切知名消息队列中间件,python函数加速器,框架包罗万象,一统编程思维,兼容50% python编程业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数。旧名字是function_scheduling_distributed_framework
Stars: ✭ 351 (+2600%)
Mutual labels:  multiprocessing, threading
Fooproxy
稳健高效的评分制-针对性- IP代理池 + API服务,可以自己插入采集器进行代理IP的爬取,针对你的爬虫的一个或多个目标网站分别生成有效的IP代理数据库,支持MongoDB 4.0 使用 Python3.7(Scored IP proxy pool ,customise proxy data crawler can be added anytime)
Stars: ✭ 195 (+1400%)
Mutual labels:  multiprocessing, threading
Tutorials
机器学习相关教程
Stars: ✭ 9,616 (+73869.23%)
Mutual labels:  multiprocessing, threading
Pebble
Multi threading and processing eye-candy.
Stars: ✭ 276 (+2023.08%)
Mutual labels:  multiprocessing, threading
React Native Multithreading
🧵 Fast and easy multithreading for React Native using JSI
Stars: ✭ 164 (+1161.54%)
Mutual labels:  multiprocessing, threading
Joblib
Computing with Python functions.
Stars: ✭ 2,620 (+20053.85%)
Mutual labels:  multiprocessing, threading
atpbar
Progress bars for threading and multiprocessing tasks on terminal and Jupyter Notebook
Stars: ✭ 74 (+469.23%)
Mutual labels:  multiprocessing, threading
OpenCV Raspberry pi TBB
Latest pre-compiled binary of Pre-released & Stable OpenCV (4.0.0) along with TBB (2018-Update 6) for Raspberry Pi.
Stars: ✭ 46 (+253.85%)
Mutual labels:  threading
a3c-super-mario-pytorch
Reinforcement Learning for Super Mario Bros using A3C on GPU
Stars: ✭ 35 (+169.23%)
Mutual labels:  multiprocessing
python-json-socket
JSON messaging based socket interface with multi-threaded server and client
Stars: ✭ 52 (+300%)
Mutual labels:  threading
bsuir-csn-cmsn-helper
Repository containing ready-made laboratory works in the specialty of computing machines, systems and networks
Stars: ✭ 43 (+230.77%)
Mutual labels:  multiprocessing
matrix-multiplication-threading
Matrix multiplication using c++11 threads
Stars: ✭ 31 (+138.46%)
Mutual labels:  threading
Quickenshtein
Making the quickest and most memory efficient implementation of Levenshtein Distance with SIMD and Threading support
Stars: ✭ 204 (+1469.23%)
Mutual labels:  threading
NotEnoughAV1Encodes-Qt
Linux GUI for AV1 Encoders
Stars: ✭ 27 (+107.69%)
Mutual labels:  multiprocessing
react-native-bg-thread
react-native-bg-thread
Stars: ✭ 45 (+246.15%)
Mutual labels:  threading
Thread
type safe multi-threading made easier
Stars: ✭ 34 (+161.54%)
Mutual labels:  threading
ruck
🧬 Modularised Evolutionary Algorithms For Python with Optional JIT and Multiprocessing (Ray) support. Inspired by PyTorch Lightning
Stars: ✭ 50 (+284.62%)
Mutual labels:  multiprocessing

PyPI version Anaconda-Server Badge DOI Test Status codecov

Mantichora

A simple interface to multiprocessing and threading


Mantichora provides a simple interface to multiprocessing and threading.

from mantichora import mantichora

with mantichora() as mcore:
    mcore.run(func1)
    mcore.run(func2)
    mcore.run(func3)
    mcore.run(func4)
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |    12559 /    12559 |:  func1
  71.27% ::::::::::::::::::::::::::::             |    28094 /    39421 |:  func2
  30.34% ::::::::::::                             |    28084 /    92558 |:  func3
  35.26% ::::::::::::::                           |    27282 /    77375 |:  func4

You can simply give Mantichora as many functions as you need to run. Mantichora will run them concurrently in background processes by using multiprocessing or in different threads by threading and give you the return values of the functions. The return values are sorted in the order of the functions you have originally given to Mantichora. Progress bars from atpbar can be used in the functions.

The code in this package was originally developed in the sub-package concurrently of alphatwirl.

The examples in this file can be also run on Jupyter Notebook.
Binder



Requirement

  • Python 3.6, 3.7, 3.8, or 3.9

Install

You can install with conda from conda-forge:

conda install -c conda-forge mantichora

or with pip:

pip install -U mantichora

User guide

Quick start

I will show here how to use Mantichora by simple examples.

Import libraries

We are going use two python standard libraries time and random in an example task function. In the example task function, we are also going to use atpbar for progress bars. Import these packages and mantichora.

import time, random
from atpbar import atpbar
from mantichora import mantichora

Define a task function

Let us define a simple task function.

def task_loop(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    return ret

The task in this function is to sleep for 0.0001 seconds as many times as the number randomly selected from between 1000 and 10000. atpbar is used to show a progress bar. The function takes two arguments: name, the label on the progress bar, and ret, the return value of the function.

Note: In the multiprocessing mode, the default mode of mantichora, task functions, their arguments, and their return values need to be picklable.

You can just try running this function without using Mantichora.

result = task_loop('task1', 'result1')

This doesn't return immediately. It waits for the function to finish. You will see a progress bar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |    58117 /    58117 |:  task1

The return value is stored in result.

print(result)
 'result1'

Run tasks concurrently with Mantichora

Now, we run multiple tasks concurrently with Mantichora.

with mantichora(nworkers=3) as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

In the example code above, mantichora is initialized with an optional argument nworkers. The nworkers specifies the number of the workers. It is 3 in the above example. The default is 4. At most as many tasks as nworkers can run concurrently.

The with statement is used in the example. This ensures that mantichora properly ends the workers.

You can give task functions and their arguments to mcore.run(). You can call mcore.run() as many times as you need. In the above example, mcore.run() is called with the same task function with different arguments. You can also use a different function each time. mcore.run() returns immediately; it doesn't wait for the task to finish or even to start. In each call, mcore.run() only puts a task in a queue. The workers in background processes pick up tasks from the queue and run them.

The mcore.returns() waits until all tasks finish and returns their return values, which are sorted in the order of the tasks you have originally given to mcore.run().

Progress bars will be shown by atpbar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     1415 /     1415 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7770 /     7770 |:  task again
 100.00% :::::::::::::::::::::::::::::::::::::::: |    18431 /    18431 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    25641 /    25641 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    74669 /    74669 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    87688 /    87688 |:  another task

The results are sorted in the original order regardless of the order in which the tasks have finished.

print(results)
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']

Features

Multiprocessing or Threading

New in version 0.10.0

From version 0.10.0, you can choose to use threading instead of multiprocessing by setting the option mode to threading (the default is multiprocessing).

with mantichora(mode='threading') as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

Without the with statement

end()

If you don't use the with statement, you need to call end().

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

results = mcore.returns()

mcore.end()
print(results)
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4695 /     4695 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7535 /     7535 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9303 /     9303 |:  another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9380 /     9380 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     5812 /     5812 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9437 /     9437 |:  task again
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']
terminate()

In the multiprocessing mode, mantichora can be terminated with terminate(). After terminate() is called, end() still needs to be called. In the example below, terminate() is called after 0.5 seconds of sleep while some tasks are still running.

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

time.sleep(0.5)

mcore.terminate()
mcore.end()

The progress bars stop when the tasks are terminated.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     2402 /     2402 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3066 /     3066 |:  another task
  59.28% :::::::::::::::::::::::                  |     2901 /     4894 |:  task
  69.24% :::::::::::::::::::::::::::              |     2919 /     4216 |:  yet another task
   0.00%                                          |        0 /     9552 |:  task again
   0.00%                                          |        0 /     4898 |:  more task

Note:: In the threading mode, terminate() does not do anything. If you initialize mantichora in the threading mode, i.e., mantichora(mode='threading'), in the above example, all tasks run until completion.


Receive results as tasks finish

Instead of waiting for all tasks to finish beofre receiving the reulsts, you can get results as tasks finish with the method receive_one() or receive_receive().

receive_one()

The method receive_one() returns a pair of the run ID and the return value of a task function. If no task has finished, receive_one() waits until one task finishes. receive_one() returns None if no tasks are outstanding. The method run() returns the run ID for the task.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    for i in range(len(runids)):
        pairs.append(mcore.receive_one())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     1748 /     1748 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4061 /     4061 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2501 /     2501 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2028 /     2028 |:  task6
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8206 /     8206 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9157 /     9157 |:  task2

The runid is the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (0, 'result1'), (4, 'result5'), (5, 'result6'), (3, 'result4'), (1, 'result2')]
receive_finished()

The method receive_finished() returns a list of pairs of the run ID and the return value of finished task functions. The method receive_finished() doesn't wait for a task to finish. It returns an empty list if no task has finished.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    while len(pairs) < len(runids):
        pairs.extend(mcore.receive_finished())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3979 /     3979 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6243 /     6243 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6640 /     6640 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8632 /     8632 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6235 /     6235 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8325 /     8325 |:  task6

The runid is the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (1, 'result2'), (0, 'result1'), (3, 'result4'), (4, 'result5'), (5, 'result6')]

Logging

In the multiprocessing mode, logging in background processes is propagated to the main process. The progagation is implemented in the way described in a section of Logging Cookbook.

Note: In the threading mode, logging just works because it is thread-safe.

Here is a simple example task function that uses logging. The task function does logging just before returning.

import logging

def task_log(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    logging.info('finishing "{}"'.format(name))
    return ret

Set the logging stream to a string stream so that we can later retrieve the logging as a string.

import io
stream = io.StringIO()
logging.basicConfig(level=logging.INFO, stream=stream)

Run the tasks.

with mantichora() as mcore:
    mcore.run(task_log, 'task1', ret='result1')
    mcore.run(task_log, 'task2', ret='result2')
    mcore.run(task_log, 'task3', ret='result3')
    mcore.run(task_log, 'task4', ret='result4')
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4217 /     4217 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7691 /     7691 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8140 /     8140 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9814 /     9814 |:  task4

Logging made in the task function in background processes is sent to the main process and written in the string stream.

print(stream.getvalue())
INFO:root:finishing "task2"
INFO:root:finishing "task3"
INFO:root:finishing "task1"
INFO:root:finishing "task4"

Start method of multiprocessing

Updated in version 0.12.0

Python multiprocessing has three start methods: fork, spawn, forkserver. These methods are described in the Python documentation. Mantichora uses by default the fork method on Linux and macOS and the spawn method on Windows. You can change the method by the option mp_start_method.

with mantichora(mp_start_method='spawn') as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()
  • On Jupyter Notebook, the fork method is typically the best choice.
  • The spawn and forkserver methods have extra restrictions, for example, on how the main module is written. The restrictions are described in the Python documentation.
  • On macOS, in the fork method, errors with the message may have been in progress in another thread when fork() was called might occur. This error might be resolved if the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY is set YES as suggested at Stack Overflow.

License

  • mantichora is licensed under the BSD license.

Contact

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].