All Projects → fikipollo → PySiQ

fikipollo / PySiQ

Licence: MIT license
A Python Simple Queue system for your apps

Programming Languages

python
139335 projects - #7 most used programming language
HTML
75241 projects

Projects that are alternatives of or similar to PySiQ

fastlane
Fastlane is a redis and docker based queueing service.
Stars: ✭ 48 (+108.7%)
Mutual labels:  queue, task-manager
celery.node
Celery task queue client/worker for nodejs
Stars: ✭ 164 (+613.04%)
Mutual labels:  queue, task-manager
minavdrawer
Easy to add different animations into standard NavigationDrawer.
Stars: ✭ 93 (+304.35%)
Mutual labels:  easy-to-use
SimplePHP
A small query builder project designed to assist daily routines and speed up the process of communicating with the database.
Stars: ✭ 14 (-39.13%)
Mutual labels:  easy-to-use
think-async
🌿 Exploring cooperative concurrency primitives in Python
Stars: ✭ 178 (+673.91%)
Mutual labels:  queue
MarkDEditor
A WYSIWYG MarkDown editor for Android.
Stars: ✭ 76 (+230.43%)
Mutual labels:  easy-to-use
SwiftUI-App
This swiftUI Demo is very simple & easy to understand. This swiftUI demo includes On-boarding screens, login screen, forgot password screen, sign up screen, home & logout.
Stars: ✭ 175 (+660.87%)
Mutual labels:  easy-to-use
Algorithms
Data Structures & Algorithms. Includes solutions for Cracking the Coding Interview 6th Edition
Stars: ✭ 89 (+286.96%)
Mutual labels:  queue
zmq
ZeroMQ based distributed patterns
Stars: ✭ 27 (+17.39%)
Mutual labels:  queue
Nickel
Micro tasks manager written in pure Python
Stars: ✭ 18 (-21.74%)
Mutual labels:  task-manager
umock-c
A pure C mocking library
Stars: ✭ 29 (+26.09%)
Mutual labels:  easy-to-use
rearq
A distributed task queue built with asyncio and redis, with built-in web interface
Stars: ✭ 81 (+252.17%)
Mutual labels:  queue
aioconnectors
Simple secure asynchronous message queue
Stars: ✭ 17 (-26.09%)
Mutual labels:  easy-to-use
node-svmq
Native System V message queues in Node.js
Stars: ✭ 16 (-30.43%)
Mutual labels:  queue
cookie-extraction
登录后提取在线cookie,更新至服务器或拷贝至剪切板,为爬虫抓取跳过复杂验证码识别程序
Stars: ✭ 46 (+100%)
Mutual labels:  easy-to-use
obsidian-tasks
Task management for the Obsidian knowledge base.
Stars: ✭ 648 (+2717.39%)
Mutual labels:  task-manager
DEPRECATED-data-structures
A collection of powerful data structures
Stars: ✭ 2,648 (+11413.04%)
Mutual labels:  queue
TaskManager
A C++14 Task Manager / Scheduler
Stars: ✭ 81 (+252.17%)
Mutual labels:  task-manager
slated-obsidian
Task management in Obsidian.md
Stars: ✭ 123 (+434.78%)
Mutual labels:  task-manager
Fortress-of-Solitude
This Library has resources to solve common data structure algorithm problems like a Doubly linked list, Generic trees, Queue, Stack, and other algorithms. Each lib has an option to carry your custom data in elements. Custom data in detail, other fantastic resources.
Stars: ✭ 53 (+130.43%)
Mutual labels:  queue

Table of contents

Introduction

PySiQ (Python Simple Queue) is a job queue or task queue implemented for Python applications. The main objective of task queues is to avoid running resource-intensive tasks immediately and wait for them to complete. Instead, tasks are scheduled by adding them to a queue, where they will wait until eventually a worker, i.e. a special process running in separate thread, takes them out of the queue and execute the job. This concept is especially necessary for web applications where it is not possible to handle a heavy task during a short HTTP request window.

Features

PySiQ is entirely implemented in Python and provides the following features:

  • Multi-process execution on tasks, configurable number of workers.
  • The status of the queued tasks can be easily checked at any time.
  • Dependencies between tasks can be specified executing them in the appropriate order.
  • Incompatible tasks can be specified, avoiding execute conflicting tasks simultaneously.
  • The results for tasks are stored until the client asks for them.
  • Lightweight module. The code takes less than 300 lines of code.
  • Easy to use and to install in your application.
  • It does not depend on other libraries or tools.
  • Available as a module imported in your application or as a self-contained service accessible through API calls.

The main component of PySiQ is the Queue, a Python object that works as a task dispatcher and worker-pool manager. The Queue is a singleton instance that is listening to the other components in the application (Figure 1-1). When the queue is instantiated, a certain number of Workers are created depending on the user's settings. Workers are special threads that extract tasks from the queue and execute them. By default, workers are idle, waiting for new tasks are sent to the queue (Figure 1-2). When a client needs to execute certain time-consuming job, it is encapsulated in a Task instance, defining the function or code to be executed and the parameters for its execution (Figure 1-3). Some additional parameters can be specified such as a timeout that will abort the execution of the task if it does not start after a determined amount of time, and a list of dependencies, i.e. the identifiers for the tasks that must be completed before launching the execution of the new task. The task instance is sent to the queue and workers are notified that a new task is waiting for being executed. As soon as a worker is idle, it takes the next task at the queue and starts the execution, provided that all its dependencies already finished (Figure 1-4).

The queue contains an internal table that keeps the status for all the tasks in the queue. Possible statuses are: "waiting", "running", "finished", and "error" (Figure 1-5). When a task is finished, it is kept in this table in addition to the results of the execution, until someone asks for the results. Similarly, failed tasks are kept in the table with the information of the error (Figure 1-6).

Figure 1. Overview of the design of the queue system.

Example of use 1 (file test/test_1.py)

The following code fragment exemplifies the usage of the developed module. For this use case an instance of queue is initialized with two workers. A total of five tasks are sent to the queue. All tasks will execute the same function called foo which displays a message indicating that execution has started, then waits N seconds, and displays a message announcing the end of execution. Both the displayed message and the duration of the delay (the N value) are provided as parameters for the foo function.

Tasks 1 and 3 will take 10 seconds for execution, while tasks 2 and 4 will take less than 5 seconds. Task 5 takes 4 seconds but it won't start until tasks 3 and 4 are completed.

Figure 2 shows the temporal line for the execution of the tasks, as well as the status for the queue and the workers at different time-points.

# ************************************************************************
# Initialize queue
# ************************************************************************
N_WORKERS = 2

queue_instance = Queue()
queue_instance.start_worker(N_WORKERS)


# ************************************************************************
# Queue tasks
# ************************************************************************

def foo(N, message):
    print message + " started..."
    from time import sleep
    sleep(N)
    print message + " finished"

queue_instance.enqueue(
    fn=foo,
    args=(10, "Task 1"),
    task_id= "Task 1"
)

queue_instance.enqueue(
    fn=foo,
    args=(4, "Task 2"),
    task_id= "Task 2"
)

queue_instance.enqueue(
    fn=foo,
    args=(10, "Task 3"),
    task_id= "Task 3"
)

queue_instance.enqueue(
    fn=foo,
    args=(5, "Task 4"),
    task_id= "Task 4"
)

queue_instance.enqueue(
    fn=foo,
    args=(4, "Task 5"),
    task_id= "Task 5",
    depend= ["Task 3", "Task 4"]
)

Figure 2. Output for the example program, overview for the tasks' execution, and status of the queue and workers. (A) - When the program execution starts, the queue is empty and both workers are idle. The five tasks are then sequentially added to the queue and workers are notified to start working. (B) - The first two tasks are extracted from the queue and executed by the workers. Each task shows a message in the terminal. (C) - After four seconds, task 2 finishes, and worker 2 becomes idle. Worker 2 then asks for a new task (task 3) and executes it. (D) - After four seconds, task 1 finishes and task 4 starts. (E) - When task 3 is done, worker 2 become idle and waits for a new task; however, task 5 cannot start because it depends on task 4, which still running on worker 1. (F) - Once task 4 finished, task 5 is valid for being executed by any worker. (G) - Finally, after 4 seconds, task 5 is finished.

Example of use 2 (file test/test_2.py)

The following code exemplifies the usage of all the available options. For this use case an instance of queue is initialized with two workers. Three tasks are sent to the queue. Tasks execute the same function foo which opens and manipulates the content of a given file and then wait n seconds. The parameters for the function are the identifier for the task, the path to the file, and the duration of the delay.

Task 1 takes 10 seconds for execution, while tasks 2 and 3 will take 5 seconds. Task 3 won't start until task 2 is completed (i.e. depends on task 2), and task 2 won't start until task 1 is done as it is incompatible with task 1.

Figure 3 shows the temporal line for the execution of the tasks, as well as the status for the queue and the workers at different time-points.

# ************************************************************************
# Initialize queue
# ************************************************************************
N_WORKERS = 2

queue_instance = Queue()
queue_instance.start_worker(N_WORKERS)


# ************************************************************************
# Queue tasks
# ************************************************************************

def foo(task_id, file_path, delay):
    file = open(file_path,"a")
    file.write("This is task " + str(task_id) + ". I'm safely writing in this file.")
    from time import sleep
    sleep(delay)
    file.close()

queue_instance.enqueue(
    fn=foo,
    args=(1, "~/test_pysiq.log", 10),
    task_id= 1
)

queue_instance.enqueue(
    fn=foo,
    args=(2, "~/test_pysiq.log", 5),
    task_id= 2,
    incompatible = ["foo"]
)

queue_instance.enqueue(
    fn=foo,
    args=(3, "~/test_pysiq.log", 5),
    task_id= 3,
    depend= [2]
)

Advanced use

Figure 4 shows a more complex example of using. For this use of case two different users interact with the queue by sending requests to a server-side program. More specifically, each user sends first a request to the server in order to launch certain time-consuming task. For each client, a task instance is created and sent to the queue and the identifier for the new job is returned to the client application. While waiting for the end if the task, the clients send periodically a request in order to query the execution status. Finally, when the task if finished, the user can retrieve the result for the execution of the task.

Figure 4. Using the queue system in a Client-Server environment.From client side, two different users send a request to execute some time-consuming task at server side (1). When a request is received, the server program wraps the job to be executed, and its parameters into an instance of task, and sends the new object to the queue system (2). When a task is queued, an identifier is returned that uniquely identifies the task in the queue system. Task identifiers are then returned to the corresponding client that can use it later to check the execution status or retrieve the results. Every time that a new task is received, the queue system notifies the workers (3), and those that are idle extract and execute the next tasks in the queue (4). While the task is being executed, clients can check periodically the status of the job by sending a request to the server (5). Possible statuses are "queued", "running", "finished" and "failed". When a worker finishes the execution of a task, the result is kept in the queue system until it is requested, and the worker proceeds to execute the next task in queue (6). If no new tasks are available the worker becomes idle. Finally, when the client detects that the task is done - i.e. it receives a 'finished' status (7), a new request is sent in order to retrieve the results of the execution (8).

Using PySiQ as a self-contained service

COMING SOON..

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].