All Projects → peter-wangxu → Persist Queue

peter-wangxu / Persist Queue

Licence: other
A thread-safe disk based persistent queue in Python

Programming Languages

python
139335 projects - #7 most used programming language

Labels

Projects that are alternatives of or similar to Persist Queue

Sqlite Dialect
Hibernate dialect for SQLite
Stars: ✭ 133 (-15.29%)
Mutual labels:  sqlite
Android Debugport
A Read-Eval-Print-Loop server for Android and SQLite
Stars: ✭ 147 (-6.37%)
Mutual labels:  sqlite
Tortoise Orm
Familiar asyncio ORM for python, built with relations in mind
Stars: ✭ 2,558 (+1529.3%)
Mutual labels:  sqlite
Sqlite2xl
Library to Convert SQLite to Excel and Vice-Versa
Stars: ✭ 136 (-13.38%)
Mutual labels:  sqlite
Godot Sqlite
GDNative wrapper for SQLite (Godot 3.2+)
Stars: ✭ 142 (-9.55%)
Mutual labels:  sqlite
Cachego
Golang Cache component - Multiple drivers
Stars: ✭ 148 (-5.73%)
Mutual labels:  sqlite
Myutils
🙏 提供时间轴转星座|生肖工具、系统存储空间获取工具、文件大小格式化工具、获取指定文件大小工具、AES加密解码工具(支持android端平台加密解密,java端和android端相互加密解密)、SharePreference操作工具、 File文件操作工具、日期获取和计算工具、界面跳转Intent操作工具、字符串验证和数值转换操作工具、手机震动工具、系统资源操作工具、网络检测工具、 wifi操作工具、单位换算工具、zip压缩和解压操作工具、XML解析操作工具(只支持几种指定格式)、图片加载和处理工具,数据库操作(增删改查)工具、Base64编码解码工具、MD5加密工具。
Stars: ✭ 130 (-17.2%)
Mutual labels:  sqlite
Atdatabases
TypeScript clients for databases that prevent SQL Injection
Stars: ✭ 154 (-1.91%)
Mutual labels:  sqlite
Dapper.linq
Dapper.Linq
Stars: ✭ 143 (-8.92%)
Mutual labels:  sqlite
Serendipity
A PHP blog software
Stars: ✭ 151 (-3.82%)
Mutual labels:  sqlite
Roomasset
A helper library to help using Room with existing pre-populated database [DEPRECATED].
Stars: ✭ 138 (-12.1%)
Mutual labels:  sqlite
Sqlite Jdbc
SQLite JDBC Driver
Stars: ✭ 1,961 (+1149.04%)
Mutual labels:  sqlite
Sqlitestudio
A free, open source, multi-platform SQLite database manager.
Stars: ✭ 2,337 (+1388.54%)
Mutual labels:  sqlite
Oreilly intermediate sql for data
Resources for the O'Reilly Online Training "Intermediate SQL For Data Analysis"
Stars: ✭ 136 (-13.38%)
Mutual labels:  sqlite
Modernarchitectureshop
The Microservices Online Shop is an application with a modern software architecture that is cleanly designed and based on.NET lightweight technologies. The shop has two build variations. The first variant is the classic Microservices Architectural Style. The second one is with Dapr. Dapr has a comprehensive infrastructure for building highly decoupled Microservices; for this reason, I am using Dapr to achieve the noble goal of building a highly scalable application with clean architecture and clean code.
Stars: ✭ 154 (-1.91%)
Mutual labels:  sqlite
Chyrp Lite
An ultra-lightweight blogging engine, written in PHP.
Stars: ✭ 131 (-16.56%)
Mutual labels:  sqlite
Efcore.bulkextensions
Entity Framework Core Bulk Batch Extensions for Insert Update Delete Read (CRUD), Truncate and SaveChanges operations on SQL Server, PostgreSQL, SQLite
Stars: ✭ 2,295 (+1361.78%)
Mutual labels:  sqlite
Oreilly getting started with sql
Database files for the O'Reilly book "Getting Started with SQL: A hands on approach for beginners" http://goo.gl/z3zG54
Stars: ✭ 156 (-0.64%)
Mutual labels:  sqlite
Cachew
Transparent and persistent cache/serialization powered by type hints
Stars: ✭ 155 (-1.27%)
Mutual labels:  sqlite
Local Sqlite Example
Demonstrates database-driven Electron using local SQLite with SQL.js.
Stars: ✭ 148 (-5.73%)
Mutual labels:  sqlite

persist-queue - A thread-safe, disk-based queue for Python

.. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac :target: https://circleci.com/gh/peter-wangxu/persist-queue

.. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue

.. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg :target: https://codecov.io/gh/peter-wangxu/persist-queue

.. image:: https://img.shields.io/pypi/v/persist-queue.svg :target: https://pypi.python.org/pypi/persist-queue

persist-queue implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:

  • Disk-based: each queued item should be stored in disk in case of any crash.
  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
  • Recoverable: Items can be read after process restart.
  • Green-compatible: can be used in greenlet or eventlet environment.

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>_ and Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>_

This project is based on the achievements of python-pqueue <https://github.com/balena/python-pqueue>_ and queuelib <https://github.com/scrapy/queuelib>_

Slack channels ^^^^^^^^^^^^^^

Join persist-queue <https://join.slack .com/t/persist-queue/shared_invite /enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>_ channel

Requirements

  • Python 2.7 or Python 3.x.
  • Full support for Linux.
  • Windows support (with Caution_ if persistqueue.Queue is used).

Features

  • Multiple platforms support: Linux, macOS, Windows
  • Pure python
  • Both filed based queues and sqlite3 based queues are supported
  • Filed based queue: multiple serialization protocol support: pickle(default), msgpack, json

Installation

from pypi ^^^^^^^^^

.. code-block:: console

pip install persist-queue
# for msgpack support, use following command
pip install persist-queue[extra]

from source code ^^^^^^^^^^^^^^^^

.. code-block:: console

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
# for msgpack support, run 'pip install -r extra-requirements.txt' first
python setup.py install

Benchmark

Here are the time spent(in seconds) for writing/reading 1000 items to the disk comparing the sqlite3 and file queue.

  • Windows
    • OS: Windows 10
    • Disk: SATA3 SSD
    • RAM: 16 GiB

+---------------+---------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+---------+-------------------------+----------------------------+ | SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 | +---------------+---------+-------------------------+----------------------------+ | File Queue | 4.9520 | 5.0560 | 8.4900 | +---------------+---------+-------------------------+----------------------------+

windows note Performance of Windows File Queue has dramatic improvement since v0.4.1 due to the atomic renaming support(3-4X faster)

  • Linux
    • OS: Ubuntu 16.04 (VM)
    • Disk: SATA3 SSD
    • RAM: 4 GiB

+---------------+--------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+--------+-------------------------+----------------------------+ | SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 | +---------------+--------+-------------------------+----------------------------+ | File Queue | 0.9123 | 1.0411 | 2.5104 | +---------------+--------+-------------------------+----------------------------+

  • Mac OS
    • OS: 10.14 (macOS Mojave)
    • Disk: PCIe SSD
    • RAM: 16 GiB

+---------------+--------+-------------------------+----------------------------+ | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | +---------------+--------+-------------------------+----------------------------+ | SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 | +---------------+--------+-------------------------+----------------------------+ | File Queue | 0.5158 | 0.5357 | 1.0446 | +---------------+--------+-------------------------+----------------------------+

note

  • The value above is in seconds for reading/writing 1000 items, the less the better
  • Above result was got from:

.. code-block:: console

python benchmark/run_benchmark.py 1000

To see the real performance on your host, run the script under benchmark/run_benchmark.py:

.. code-block:: console

python benchmark/run_benchmark.py <COUNT, default to 100>

Examples

Example usage with a SQLite3 based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Close the console, and then recreate the queue:

.. code-block:: python

import persistqueue q = persistqueue.SQLiteQueue('mypath', auto_commit=True) q.get() 'str2'

Example usage of SQLite3 based UniqueQ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This queue does not allow duplicate items.

.. code-block:: python

import persistqueue q = persistqueue.UniqueQ('mypath') q.put('str1') q.put('str1') q.size 1 q.put('str2') q.size 2

Example usage of SQLite3 based SQLiteAckQueue/UniqueAckQ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The core functions:

  • get: get from queue and mark item as unack
  • ack: mark item as acked
  • nack: there might be something wrong with current consumer, so mark item as ready and new consumer will get it
  • ack_failed: there might be something wrong during process, so just mark item as failed.
  • clear_acked_data: perform a sql delete agaist sqlite, it remove the latest 1000 items whose status is AckStatus.acked (note: this does not shrink the file size on disk)
  • shrink_disk_usage perform a VACUUM against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after clear_acked_data

.. code-block:: python

import persistqueue ackq = persistqueue.SQLiteAckQueue('path') ackq.put('str1') item = ackq.get()

Do something with the item

ackq.ack(item) # If done with the item ackq.nack(item) # Else mark item as nack so that it can be proceeded again by any worker ackq.ack_failed(item) # Or else mark item as ack_failed to discard this item

Note:

  1. The SQLiteAckQueue always uses "auto_commit=True".
  2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)".
  3. UniqueAckQ only allows for unique items

Example usage with a file based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()

Example usage with an auto-saving file based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Available since: v0.5.0

By default, items added to the queue are persisted during the put() call, and items removed from a queue are only persisted when task_done() is called.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'
>>> q.get()
'b'

After exiting and restarting the queue from the same path, we see the items remain in the queue, because task_done() wasn't called before.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'a'
>>> q.get()
'b'

This can be advantageous. For example, if your program crashes before finishing processing an item, it will remain in the queue after restarting. You can also spread out the task_done() calls for performance reasons to avoid lots of individual writes.

Using autosave=True on a file based queue will automatically save on every call to get(). Calling task_done() is not necessary, but may still be used to join() against the queue.

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue("mypath", autosave=True)
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'

After exiting and restarting the queue from the same path, only the second item remains:

.. code-block:: python

>>> from persistqueue import Queue
>>> q = Queue('mypath', autosave=True)
>>> q.get()
'b'

Example usage with a SQLite3 based dict ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue\pdict.py", line 58, in __getitem__
    raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'

Close the console and restart the PDict

.. code-block:: python

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']
321

Multi-thread usage for SQLite3 based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

from persistqueue import FIFOSQLiteQueue

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

multi-thread usage for Queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

note

Due to the limitation of file queue described in issue #89 <https://github.com/peter-wangxu/persist-queue/issues/89>_, task_done in one thread may acknowledge items in other threads which should not be. Considering the SQLiteAckQueue if you have such requirement.

Serialization via msgpack/json ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  • v0.4.1: Currently only available for file based Queue
  • v0.4.2: Also available for SQLite3 based Queues

.. code-block:: python

>>> from persistqueue
>>> q = persistqueue.Queue('mypath', persistqueue.serializers.msgpack)
>>> # via json
>>> # q = Queue('mypath', persistqueue.serializers.json)
>>> q.get()
'b'
>>> q.task_done()

Explicit resource reclaim ^^^^^^^^^^^^^^^^^^^^^^^^^

For some reasons, an application may require explicit reclamation for file handles or sql connections before end of execution. In these cases, user can simply call: .. code-block:: python

q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
del q

to reclaim related file handles or sql connections.

Tips

task_done is required both for file based queue and SQLite3 based queue (when auto_commit=False) to persist the cursor of next get to the disk.

Performance impact

  • WAL

    Starting on v0.3.2, the persistqueue is leveraging the sqlite3 builtin feature WAL <https://www.sqlite.org/wal.html>_ which can improve the performance significantly, a general testing indicates that persistqueue is 2-4 times faster than previous version.

  • auto_commit=False

    Since persistqueue v0.3.0, a new parameter auto_commit is introduced to tweak the performance for sqlite3 based queues as needed. When specify auto_commit=False, user needs to perform queue.task_done() to persist the changes made to the disk since last task_done invocation.

  • pickle protocol selection

    From v0.3.6, the persistqueue will select Protocol version 2 for python2 and Protocol version 4 for python3 respectively. This selection only happens when the directory is not present when initializing the queue.

Tests

persist-queue use tox to trigger tests.

  • Unit test

.. code-block:: console

tox -e <PYTHON_VERSION>

Available <PYTHON_VERSION>: py27, py34, py35, py36, py37

  • PEP8 check

.. code-block:: console

tox -e pep8

pyenv <https://github.com/pyenv/pyenv>_ is usually a helpful tool to manage multiple versions of Python.

Caution

Currently, the atomic operation is supported on Windows while still in experimental, That's saying, the data in persistqueue.Queue could be in unreadable state when an incidental failure occurs during Queue.task_done.

DO NOT put any critical data on persistqueue.queue on Windows.

Contribution

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).

License

BSD <LICENSE>_

Contributors

Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>_

FAQ

  • sqlite3.OperationalError: database is locked is raised.

persistqueue open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.

  • sqlite3 based queues are not thread-safe.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please make sure you set the multithreading=True when initializing the queue before submitting new issue:).

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].