All Projects → nipuntalukdar → geeteventbus

nipuntalukdar / geeteventbus

Licence: MIT License
An inprocess eventbus for highly concurrent Python applications

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to geeteventbus

Libzmq
ZeroMQ core engine in C++, implements ZMTP/3.1
Stars: ✭ 7,418 (+43535.29%)
Mutual labels:  messaging, concurrency
Linux-Kernel-Driver-Programming
Implementation of PCI drivers, kprobe, sysfs, devfs, sensor driver, miscdevices, synchronization
Stars: ✭ 43 (+152.94%)
Mutual labels:  concurrency, concurrent-programming
Zeromq Ng
⚡️ Next-generation Node.js bindings to the ZeroMQ library
Stars: ✭ 45 (+164.71%)
Mutual labels:  messaging, concurrency
Tascalate Concurrent
Implementation of blocking (IO-Bound) cancellable java.util.concurrent.CompletionStage and related extensions to java.util.concurrent.ExecutorService-s
Stars: ✭ 144 (+747.06%)
Mutual labels:  concurrency, concurrent-programming
TAOMP
《多处理器编程的艺术》一书中的示例代码实现,带有注释与单元测试
Stars: ✭ 39 (+129.41%)
Mutual labels:  concurrency, concurrent-programming
Sobjectizer
An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.
Stars: ✭ 172 (+911.76%)
Mutual labels:  concurrency, concurrent-programming
Actors.jl
Concurrent computing in Julia based on the Actor Model
Stars: ✭ 95 (+458.82%)
Mutual labels:  concurrency, concurrent-programming
Fucking Java Concurrency
🎏 Simple show cases of java concurrency problems, seeing 🙈 is believing 🐵
Stars: ✭ 779 (+4482.35%)
Mutual labels:  concurrency, concurrent-programming
scalable-concurrent-containers
High performance containers and utilities for concurrent and asynchronous programming
Stars: ✭ 101 (+494.12%)
Mutual labels:  concurrency, concurrent-programming
java-multithread
Códigos feitos para o curso de Multithreading com Java, no canal RinaldoDev do YouTube.
Stars: ✭ 24 (+41.18%)
Mutual labels:  concurrency, concurrent-programming
Chymyst Core
Declarative concurrency in Scala - The implementation of the chemical machine
Stars: ✭ 142 (+735.29%)
Mutual labels:  concurrency, concurrent-programming
transit
Massively real-time city transit streaming application
Stars: ✭ 20 (+17.65%)
Mutual labels:  concurrency, concurrent-programming
Awesome Lockfree
A collection of resources on wait-free and lock-free programming
Stars: ✭ 1,046 (+6052.94%)
Mutual labels:  concurrency, concurrent-programming
Concurrent Map
a thread-safe concurrent map for go
Stars: ✭ 2,627 (+15352.94%)
Mutual labels:  concurrency, concurrent-programming
Mt
tlock, RWMUTEX, Collab, USM, RSem and other C++ templates for Windows to provide read/write mutex locks, various multithreading tools, collaboration, differential updates and more
Stars: ✭ 18 (+5.88%)
Mutual labels:  concurrency, concurrent-programming
go-left-right
A faster RWLock primitive in Go, 2-3 times faster than RWMutex. A Go implementation of concurrency control algorithm in paper <Left-Right - A Concurrency Control Technique with Wait-Free Population Oblivious Reads>
Stars: ✭ 42 (+147.06%)
Mutual labels:  concurrency, concurrent-programming
Concurrencpp
Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all
Stars: ✭ 340 (+1900%)
Mutual labels:  concurrency, concurrent-programming
Java Concurrency Progamming Tutorial
BAT华为大厂一线工程师四年磨一剑精心编排 Java 高并发编程案例代码 & 教程 & 面试题集锦。详细文档讲解请阅读本人的知识库仓:https://github.com/Wasabi1234/Java-Interview-Tutorial
Stars: ✭ 606 (+3464.71%)
Mutual labels:  concurrency, concurrent-programming
practice
Java并发编程与高并发解决方案:http://coding.imooc.com/class/195.html Java开发企业级权限管理系统:http://coding.imooc.com/class/149.html
Stars: ✭ 39 (+129.41%)
Mutual labels:  concurrency, concurrent-programming
traffic
Massively real-time traffic streaming application
Stars: ✭ 25 (+47.06%)
Mutual labels:  concurrency, concurrent-programming

geeteventbus

An eventbus for concurrent programming

geeteventbus is a library that allows publish-subscribe-style communication. There is no need for the components to register to each-other. It is inspired by a Java library, Guava eventbus from Google. But it is not exactly same as the Guava eventbus library.

  • geeteventbus simplifies handling events from publishers and subscribers.
  • publisher and subscribers don't need to create threads to concurrently process the events.
  • the eventbus can be synchronus, where the events are delivered from the same thread posting the events
  • events can be delivered to subscibers in the same order they are posted
  • subscribers may be declared as thread-safe, in that case same subscriber may be invoked concurrently for processing multiple events
  • events for which there are no subscribers are registered yet are simply discared by the eventbus.
  • the eventbus is not to be used for inter process communication. Publishers and subsribers must run on the same process

Basic working

  1. We create an eventbus

    from geeteventbus.eventbus import eventbus
    eb = eventbus()

    This will create an eventbus with the defaults. The default eventbus will have below characteristics:

    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:

    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic "an_important_topic".

    from geeteventbus.event import event
    
    eobj1 = event('an_important_topic', 'This is some data for the event 1')
    eobj2 = event('an_important_topic', 'This is some data for the event 2')
    eobj3 = event('an_important_topic', 'This is some data for the event 3')
    eobj3 = event('an_important_topic', 'This is some data for the event 4')
    
    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)
  4. We may gracefully shutdown the eventbus before exiting the process

    eb.shutdown()

The complete example is below:

from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
            return
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))


eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')


eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

eb.post(eobj1)
eb.post(eobj2)
eb.post(eobj3)
eb.post(eobj4)

eb.shutdown()
sleep(2)

A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:

from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint


class counter_aggregator(subscriber, Thread):
    '''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''
    def __init__(self, counter_names):
        Thread.__init__(self)
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        '''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
            return
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        '''
        Keep outputing the aggregated counts every 2 seconds
        '''
        while self.keep_running:
            sleep(2)
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')


class count_producer:
    '''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
            ebus.post(ev)
            sleep(0.02)
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        threads.append(Thread(target=producer))
        i -= 1

    aggregator_thread = Thread(target=subcr)
    aggregator_thread.start()
    for thrd in threads:
        thrd.start()
    sleep(20)
    producer.stop()
    subcr.stop()
    sleep(2)
    ebus.shutdown()
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].