All Projects → ontio → ontology-eventbus

ontio / ontology-eventbus

Licence: Apache-2.0 license
The Go Language Implementation of Ontology Actor Model

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to ontology-eventbus

game-executor
采用Reactor模式,注册readycreate, readyfinish事件到更新服务UpdateService,通过处理后进行模型缓存,然后将消息转化为 dispatchThread消息分配模型需要的create, update, finish的事件进行单线程循环调度 。调度过程使用了系统预置锁模型,来进行多线程唤醒机制,将所有的update循环检测进行多 线程调度,多线程更新服务使用future-listener机制,在完成调度后,根据模型状态,如果模型存活重新将消息转化为update 事件注册到dispatchThread消息分配模型进行循环处理。如果模型死亡将消息转化为readyfinish事件注册到更新服务UpdateServic进行销毁 。这个系统实现了模型自动缓存,多…
Stars: ✭ 28 (+16.67%)
Mutual labels:  eventbus
solr-ontology-tagger
Automatic tagging and analysis of documents in an Apache Solr index for faceted search by RDF(S) Ontologies & SKOS thesauri
Stars: ✭ 36 (+50%)
Mutual labels:  ontology
OEPs
Ontology Enhancement Proposal
Stars: ✭ 33 (+37.5%)
Mutual labels:  ontology
MASA.Contrib
The purpose of MASA.Contrib is based on MASA.BuildingBlocks to provide open, community driven reusable components for building mesh applications. These components will be used by the MASA Stack and MASA Labs projects.
Stars: ✭ 102 (+325%)
Mutual labels:  eventbus
mvp4g
A framework to build a gwt application the right way
Stars: ✭ 29 (+20.83%)
Mutual labels:  eventbus
ReactiveBus
🚍 Reactive Event Bus for JVM (1.7+) and Android apps built with RxJava 2
Stars: ✭ 17 (-29.17%)
Mutual labels:  eventbus
microservice-architecture-quick-start
Demonstrates how to build a Microservices Architecture using the C # language and the ASP.NET Core environment.
Stars: ✭ 16 (-33.33%)
Mutual labels:  eventbus
ITO
Intelligence Task Ontology (ITO)
Stars: ✭ 37 (+54.17%)
Mutual labels:  ontology
ontology
Global Insurance Ontology
Stars: ✭ 71 (+195.83%)
Mutual labels:  ontology
CASE
Cyber-investigation Analysis Standard Expression (CASE) Ontology
Stars: ✭ 46 (+91.67%)
Mutual labels:  ontology
ont-api
ONT-API (OWL-API over Apache Jena)
Stars: ✭ 20 (-16.67%)
Mutual labels:  ontology
vxrifa
Utility library for Vert.X that allows using strong-typed interfaces in communication through EventBus
Stars: ✭ 15 (-37.5%)
Mutual labels:  eventbus
IpcEventBus
Faster than Intents and easier than AIDLs.
Stars: ✭ 35 (+45.83%)
Mutual labels:  eventbus
lsw2
OWL and Semantic Web toolkit for Common Lisp, used for construction and reasoning over ontologies and ontology-structured data
Stars: ✭ 22 (-8.33%)
Mutual labels:  ontology
LiteBus
LiteBus is an easy-to-use and ambitious in-process mediator providing the foundation to implement CQS. It is implemented with minimum reflection usage and streamable query support.
Stars: ✭ 20 (-16.67%)
Mutual labels:  eventbus
HPO-translations
Internationalisation of the HPO content
Stars: ✭ 19 (-20.83%)
Mutual labels:  ontology
DelphiEventBus
Implementation of event bus pattern for Delphi XE
Stars: ✭ 32 (+33.33%)
Mutual labels:  eventbus
telephone-ts
Telephone-ts: The "Event Emitter-less" TypeScript Event Architecture.
Stars: ✭ 22 (-8.33%)
Mutual labels:  eventbus
thymeflow
Installer for Thymeflow, a personal knowledge management system.
Stars: ✭ 27 (+12.5%)
Mutual labels:  ontology
e
A library which combines a eventBus/emitter, DOM events management, delegated events, and event-based utils into a single lightweight and performant library.
Stars: ✭ 37 (+54.17%)
Mutual labels:  eventbus

中文版

The Go Language Implementation of Actor Model

inter-process communication

inter-node communication

ONT's Signature Verification Test

About Actor Model

Actor is a model of parallel computation model in computer science that treats "actors" as the the universal primitives of concurrent computation. In response to a message that it receives, an actor can make local decisions: make local decisions, create more actor, send more messages, and determine how to respond to the next message received.

The actor model adopts the philosophy that everything is an actor. This is similar to the everthing is an object philosophy used by some object-oriented programming languages. Although softwares including object-oriented language in sequence, Actor model executes in parellel in essence.

All actor has one (and only one) Mailbox. Mailbox is like a small message queue. Once the sender send message, the message will be pushed into the Mailbox. The sequence of push is determined by the sequence of sending. Mailbox also has many type of implementation, the default is FIFO. However, the implementation could be different according to the priority of pop.

actor

Actor VS Channel

actor

channel

Advantages of Actor Model:

  1. The volume of actor's Mailbox is unlimited, which won't interrupt the processing to writing.

  2. All messages of each single actor share the same mailbox(channel)

  3. Actor do not need to care about the message writer, and could decouple the logic between each module.

  4. Actor can be deployed on different nodes.

Disadvantage of Actor Model

  1. Since the Actor model is designed to be an asynchronous model, the efficiency of synchronization is not very high.

Create Actors

Props has provided the basis of declaration of how to create Actors. The following example defines the Actor Propos by defining the declaration of the function processes messge.

var props Props = actor.FromFunc(func(c Context) {
	// process messages
})

Besides, The interface of Actor could be implemented by creating a structure and defining a Recive function.

type MyActor struct {}

func (a *MyActor) Receive(c Context) {
	// process messages
}

var props Props = actor.FromProducer(func() Actor { return &MyActor{} })

Spawn and SpawnNamed make use of the given props to create the execution instance of Actor. Once the Actor is started, it begins to process the received message. Use the unique name specified by ststem to start the actor like:

pid := actor.Spawn(props)

The return value is an unique PID. You could start Actor if you want to name PID on your own.

Once an actor is started, a new email address will be created and related to the PID. The messages will be sent to the address, and processed bht actor.

Process Messages

Actor Processes messages by Receive function which is defined:

Receive(c actor.Context)

The system will make sure that the function would only be called synchronously. Hence user don't need to figure out any additional protection measure.

Communicate with other actors

PID is the main interface to send actors messages. And PID.Tell fucntion is used to send messages to the PID asynchronously.

pid.Tell("Hello World")

According to different business requirement, the communication between actors could be carried out synchronously or asynchronously. And Actors align with PID whenever they communicate.

When PID.Request or PID.RequestFutre is used to send messages, the actor receiving messages will response to the sender by Context.Sender function, which returns the PID of sender.

In terms of synchronous communication, Actor uses Future to implment it. Actor will wait for the result before carry out the next step.

User could use RequestFuture function to send message to actor and wait for result. The function will return a Future:

f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond)
res, err := f.Result() // waits for pid to reply */

Inter-process Communication

Performance

Asynchronous call

Protoactor-go can currently pass about 2000,000 messages per second between 2 actors, and can guarantee the order of those messages.

/app/go/bin/go build -o "/tmp/Build performanceTest.go and rungo" 
/app/gopath/src/github.com/ontio/ontology-eventbus/example/performanceTest.go
start at time: 1516953710985385134
end at time 1516953716291953904
run time:10000000     elapsed time:5306 ms

Serial synchronization call

Protoactor-go can now pass more than 500,000 messages per second between client and server in a serial synchronous call.

goos: linux
goarch: amd64
pkg: github.com/ontio/ontology-eventbus/example/benchmark
benchmark                  iter                 time/iter          bytes alloc          allocs
---------                  ----                ---------           -----------          ------
BenchmarkSyncTest-4   	 1000000	      1967 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1987 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1952 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1975 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4   	 1000000	      1987 ns/op	     432 B/op	      13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
PASS
ok  	github.com/ontio/ontology-eventbus/example/benchmarks	10.984s

Hello world

type Hello struct{ Who string }
type HelloActor struct{}

func (state *HelloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    props := actor.FromProducer(func() actor.Actor { return &HelloActor{} })
    pid := actor.Spawn(props)
    pid.Tell(Hello{Who: "Roger"})
    console.ReadLine()
}

Two actors communicates each other

This example describes how to perform asynchronous communication between two actors. It mainly defines the behavior of the actor after receiving messages(Receive), including the processing method and the actor to which the processed message is sent. The asynchronous communication ensures the utilization of the actor.

type ping struct{ val int }
type pingActor struct{}

func (state *pingActor) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started, initialize actor here")
	case *actor.Stopping:
		fmt.Println("Stopping, actor is about shut down")
	case *actor.Restarting:
		fmt.Println("Restarting, actor is about restart")
	case *ping:
		val := msg.val
		if val < 10000000 {
			context.Sender().Request(&ping{val: val + 1}, context.Self())
		} else {
			end := time.Now().UnixNano()
			fmt.Printf("%s end %d\n", context.Self().Id, end)
		}
	}
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	props := actor.FromProducer(func() actor.Actor { return &pingActor{} })
	actora := actor.Spawn(props)
	actorb := actor.Spawn(props)
	fmt.Printf("begin time %d\n", time.Now().UnixNano())
	actora.Request(&ping{val: 1}, actorb)
	time.Sleep(10 * time.Second)
	actora.Stop()
	actorb.Stop()
	console.ReadLine()
}

Server/Client synchronization call

This example mainly describes how to communicate with the actor (server) synchronously. The client sends the request message to the actor and waits for the actor to return the result. The request may need multiple actor to cooperate and complete. Asynchronous communication in the above example is used for processing between multiple actors, and the final processing result will be returned to the client.

message.go

type Request struct {
	Who string
}

type Response struct {
	Welcome string
}

server.go

type Server struct {}

func (server *Server) Receive(context actor.Context) {
	switch msg := context.Message().(type) {
	case *actor.Started:
		fmt.Println("Started, initialize server actor here")
	case *actor.Stopping:
		fmt.Println("Stopping, actor is about shut down")
	case *actor.Restarting:
		fmt.Println("Restarting, actor is about restart")
	case *message.Request:
		fmt.Println("Receive message", msg.Who)
		context.Sender().Request(&message.Response{Welcome: "Welcome!"}, context.Self())
	}
}

func (server *Server) Start() *actor.PID{
	props := actor.FromProducer(func() actor.Actor { return &Server{} })
	pid := actor.Spawn(props)
	return pid
}

func (server *Server) Stop(pid *actor.PID) {
	pid.Stop()
}

client.go

type Client struct {}


//Call the server synchronously
func (client *Client) SyncCall(serverPID *actor.PID) (interface{}, error) {
	future := serverPID.RequestFuture(&message.Request{Who: "Ontology"}, 10*time.Second)
	result, err := future.Result()
	return result, err
}

main.go

func main() {
	server := &server.Server{}
	client := &client.Client{}
	serverPID := server.Start()
	result, err := client.SyncCall(serverPID)
	if err != nil {
		fmt.Println("ERROR:", err)
	}
	fmt.Println(result)
}

EventHub

Actor can perform broadcast and subscribe operations through EventHub, support ALL, ROUNDROBIN, RANDOM broadcast mode

Example

package main

import (
	"github.com/ontio/ontology-eventbus/eventhub"
	"fmt"
	"github.com/ontio/ontology-eventbus/actor"

	"time"
)


type PubMessage struct{
	message string
}

type ResponseMessage struct{
	message string
}

func main() {

	eh:= eventhub.GlobalEventHub
	subprops := actor.FromFunc(func(context actor.Context) {
		switch msg := context.Message().(type) {

		case PubMessage:
			fmt.Println(context.Self().Id + " get message "+msg.message)
			context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())

		default:

		}
	})

	pubprops := actor.FromFunc(func(context actor.Context) {
		switch msg := context.Message().(type) {

		case ResponseMessage:
			fmt.Println(context.Self().Id + " get message "+msg.message)
			//context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())

		default:
			//fmt.Println("unknown message type")
		}
	})


	publisher, _ := actor.SpawnNamed(pubprops, "publisher")
	sub1, _ := actor.SpawnNamed(subprops, "sub1")
	sub2, _ := actor.SpawnNamed(subprops, "sub2")
	sub3, _ := actor.SpawnNamed(subprops, "sub3")

	topic:= "TEST"
	eh.Subscribe(topic,sub1)
	eh.Subscribe(topic,sub2)
	eh.Subscribe(topic,sub3)

	event := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyAll}
	eh.Publish(event)
	time.Sleep(2*time.Second)
	fmt.Println("before unsubscribe sleeping...")
	eh.Unsubscribe(topic,sub2)
	eh.Publish(event)
	time.Sleep(2*time.Second)

	fmt.Println("random event...")
	randomevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRandom}
	for i:=0 ;i<10;i++{
		eh.Publish(randomevent)
	}

	time.Sleep(2*time.Second)

	fmt.Println("roundrobin event...")
	roundevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRoundRobin}
	for i:=0 ;i<10;i++{
		eh.Publish(roundevent)
	}

	time.Sleep(2*time.Second)
}

Inter-node Communication

This project adopts two ways to implement inter-node communication, namely grpc and zeromq, corresponding to the remote and zmqremote packages in the project. During use, please select the package to be imported according to requirements (the interface is the same).

In order to use zero mq need to install libzmq [https://github.com/zeromq/libzmq]

Performance

Microsoft Cloud Intra Node

mode 256B Message Size 512B Message Size 1k Message Size 10k Message Size 100k Message Size 1M Message Size 4M Message Size 8M Message Size
grpc 120000/s 100000/s 85000/s 40000/s 4600/s 490/s 123/s Out of grpc default 4m limit
zeromq 170000/s 140000/s 10000/s 45000/s 4900/s 500/s 123/s 62/s

Serialization

The module uses protobuf for serialization and deserialization by default. During use, a series of protobuf message structures need to be defined. In order to integrate with the serialization methods used in the Ontology project, and reduce the system modification workload, personalized serialization and deserialization methods is currently supported:

The current approach is to define a generic system message. The message structure is: message type + message content (data after serialization). For Ontology's commonly used structures, there are currently six commonly used message types (address, Block, header, Transaction, TxAttribute, VMCode) as follows:

enum MsgType {
  ADDRESS_MSG_TYPE = 0;
  BLOCK_MSG_TYPE = 1;
  HEADER_MSG_TYPE = 2;
  TX_MSG_TYPE    = 3;
  TX_ATT_MSG_TYPE = 4;
  VM_CODE_MSG_TYPE = 5;
}

message MsgData {
  MsgType msgType = 1;
  bytes data = 2;
}

While using, serialize the data to be transmitted in a custom manner, and then construct MsgData {msgType:xx, data:xx}, using the above enumeration definition to define msgType. After these processes ,data is custom serialized. The same is true for the received message. After receiving the message, execute the corresponding deserialization method according to msgType to deserialize the data. A simple case is as follows:

server.go

func main() {
	log.Debug("test")
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	zmqremote.Start("127.0.0.1:8080")

	props := actor.
		FromFunc(
			func(context actor.Context) {
				switch context.Message().(type) {
				case *zmqremote.MsgData:
					switch MsgData.MsgType:
						case 0:  //Deserialization MsgData.Data
						case 1:  //Deserialization MsgData.Data
						case 2:  //Deserialization MMsgData.Data
						case 3:  //Deserialization MMsgData.Data
						case 4:  //Deserialization MMsgData.Data
						case 5:  //Deserialization MMsgData.Data
					context.Sender().Tell(&zmqremote.MsgData{MsgType: 1, Data: []byte("123")})
				}
			}).
		WithMailbox(mailbox.Bounded(1000000))

	pid, _ := actor.SpawnNamed(props, "remote")
	fmt.Println(pid)

	for {
		time.Sleep(1 * time.Second)
	}
}

client.go

func main() {

	log.Debug("test")
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	var wg sync.WaitGroup

	messageCount := 500

	zmqremote.Start("127.0.0.1:8081")

	props := actor.
		FromProducer(newLocalActor(&wg, messageCount)).
		WithMailbox(mailbox.Bounded(1000000))

	pid := actor.Spawn(props)
	fmt.Println(pid)

	remotePid := actor.NewPID("127.0.0.1:8080", "remote")
	wg.Add(1)

	start := time.Now()
	fmt.Println("Starting to send")

	message := &zmqremote.MsgData{MsgType: 1, Data: []byte("123")}
	for i := 0; i < messageCount; i++ {
		remotePid.Request(message, pid)
	}

	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Elapsed %s", elapsed)

	x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
	fmt.Printf("Msg per sec %v", x)
}

Benchmark

node2/main.go

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	remote.Start("127.0.0.1:8080")
	var sender *actor.PID
	props := actor.
		FromFunc(
			func(context actor.Context) {
				switch msg := context.Message().(type) {
				case *messages.StartRemote:
					fmt.Println("Starting")
					sender = msg.Sender
					context.Respond(&messages.Start{})
				case *messages.Ping:
					sender.Tell(&messages.Pong{})
				}
			}).
		WithMailbox(mailbox.Bounded(1000000))
	actor.SpawnNamed(props, "remote")
	for{
		time.Sleep(1 * time.Second)
	}
}

node1/main.go

type localActor struct {
	count        int
	wgStop       *sync.WaitGroup
	messageCount int
}

func (state *localActor) Receive(context actor.Context) {
	switch context.Message().(type) {
	case *messages.Pong:
		state.count++
		if state.count%50000 == 0 {
			fmt.Println(state.count)
		}
		if state.count == state.messageCount {
			state.wgStop.Done()
		}
	}
}

func newLocalActor(stop *sync.WaitGroup, messageCount int) actor.Producer {
	return func() actor.Actor {
		return &localActor{
			wgStop:       stop,
			messageCount: messageCount,
		}
	}
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() * 1)
	runtime.GC()

	var wg sync.WaitGroup

	messageCount := 50000
	//remote.DefaultSerializerID = 1
	remote.Start("127.0.0.1:8081")

	props := actor.
		FromProducer(newLocalActor(&wg, messageCount)).
		WithMailbox(mailbox.Bounded(1000000))

	pid := actor.Spawn(props)

	remotePid := actor.NewPID("127.0.0.1:8080", "remote")
	remotePid.
		RequestFuture(&messages.StartRemote{
			Sender: pid,
		}, 5*time.Second).
		Wait()

	wg.Add(1)

	start := time.Now()
	fmt.Println("Starting to send")

	bb := bytes.NewBuffer([]byte(""))
	for i := 0; i < 2000; i++ {
		bb.WriteString("1234567890")
	}
	message := &messages.Ping{Data: bb.Bytes()}
	for i := 0; i < messageCount; i++ {
		remotePid.Tell(message)
	}

	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Elapsed %s", elapsed)

	x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
	fmt.Printf("Msg per sec %v", x)
}

messages/protos.proto

Protobuf file generation command: protoc -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf/ --gogoslick_out=plugins=grpc:. /path/to/protos.proto

syntax = "proto3";
package messages;
import "github.com/ontio/ontology-eventbus/actor/protos.proto";

message Start {}
message StartRemote {
    actor.PID Sender = 1;
}
message Ping {
    bytes Data = 1;
}
message Pong {}

Signature Verification Test

The code could be found at directories: example/testRemoteCrypto and example/testSyncCrypto

The test environment comes from Microsoft Azure.

Asynchronous Signature Verification Test

Mode 256B Message Size 512B Message Size 1k Message Size 10k Message Size
One Machine(zeromq) 3666/s 3590/s 3479/s 2848/s
Two Machines(zeromq) 7509/s 7431/s 7204/s 6976/s

Synchronous Signature Verification Test

Quota 256B Message Size 512B Message Size 1k Message Size 10k Message Size
Time for Signature Verification 0.242ms 0.247ms 0.246ms 0.334ms
latency 1.36ms 1.31ms 1.39ms 1.94ms

This Module is based on AsynkronIT/protoactor-go project, more details goes to https://github.com/AsynkronIT/protoactor-go.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].