All Projects → shrbank → Syncspout

shrbank / Syncspout

Licence: gpl-2.0
SyncSpout是用来构造可交互的、同步的Storm拓扑的组件。我们在做实时推荐系统中,希望将Storm的并发性和分布式计算能力应用到“请求-响应”范式中, 比如客户的某次购买行为能够以消息的形式发送到storm拓扑中,storm在指定时间返回推荐结果,也就是说storm需要具有可交互性。基于这样的背景,大数据团队开发了SyncSpout组件, 该组件可以接收客户端异步的消息,经过Storm拓扑异步计算,在指定时间内返回给客户端。

Programming Languages

scala
5932 projects

Labels

Projects that are alternatives of or similar to Syncspout

xxhadoop
Data Analysis Using Hadoop/Spark/Storm/ElasticSearch/MachineLearning etc. This is My Daily Notes/Code/Demo. Don't fork, Just star !
Stars: ✭ 37 (-57.95%)
Mutual labels:  storm
hadoop-docker-lite
Docker build project to setup a lightweight hadoop cluster containing hadoop, pig, zookeeper, hbase, phoenix, storm, kafka, kafka manager
Stars: ✭ 24 (-72.73%)
Mutual labels:  storm
Stormtweetssentimentd3viz
Computes and visualizes the sentiment analysis of tweets of US States in real-time using Storm.
Stars: ✭ 25 (-71.59%)
Mutual labels:  storm
bullet-storm
The Apache Storm implementation of the Bullet backend
Stars: ✭ 39 (-55.68%)
Mutual labels:  storm
storm-traffic
使用Storm实时处理交通大数据(数据源:kafka,集群管理:zookeeper)
Stars: ✭ 34 (-61.36%)
Mutual labels:  storm
Bdp Dataplatform
大数据生态解决方案数据平台:基于大数据、数据平台、微服务、机器学习、商城、自动化运维、DevOps、容器部署平台、数据平台采集、数据平台存储、数据平台计算、数据平台开发、数据平台应用搭建的大数据解决方案。
Stars: ✭ 456 (+418.18%)
Mutual labels:  storm
qs-hadoop
大数据生态圈学习
Stars: ✭ 18 (-79.55%)
Mutual labels:  storm
Storm Dynamic Spout
A framework for building spouts for Apache Storm and a Kafka based spout for dynamically skipping messages to be processed later.
Stars: ✭ 40 (-54.55%)
Mutual labels:  storm
NanoJ-Fluidics
Manual, source-code and binaries for the NanoJ-Fluidics project
Stars: ✭ 47 (-46.59%)
Mutual labels:  storm
Storm
Mirror of Apache Storm
Stars: ✭ 6,297 (+7055.68%)
Mutual labels:  storm
storm-ml
an online learning algorithm library for Storm
Stars: ✭ 18 (-79.55%)
Mutual labels:  storm
graphiql-storm
🌪 A GraphQl Web IDE
Stars: ✭ 111 (+26.14%)
Mutual labels:  storm
Streaming Readings
Streaming System 相关的论文读物
Stars: ✭ 554 (+529.55%)
Mutual labels:  storm
stormnode
Node js node client for storm.dev
Stars: ✭ 11 (-87.5%)
Mutual labels:  storm
Storm Camel Example
Real-time analysis and visualization with Storm-AMQ-Camel-Websockets-Highcharts integration.
Stars: ✭ 28 (-68.18%)
Mutual labels:  storm
crawling-framework
Easily crawl news portals or blog sites using Storm Crawler.
Stars: ✭ 22 (-75%)
Mutual labels:  storm
Wirbelsturm
Wirbelsturm is a Vagrant and Puppet based tool to perform 1-click local and remote deployments, with a focus on big data tech like Kafka.
Stars: ✭ 332 (+277.27%)
Mutual labels:  storm
Kafka Study
关于kafka的一些相关使用示例代码。
Stars: ✭ 84 (-4.55%)
Mutual labels:  storm
Data Ingestion Platform
Stars: ✭ 39 (-55.68%)
Mutual labels:  storm
Kafka Storm Starter
Code examples that show to integrate Apache Kafka 0.8+ with Apache Storm 0.9+ and Apache Spark Streaming 1.1+, while using Apache Avro as the data serialization format.
Stars: ✭ 728 (+727.27%)
Mutual labels:  storm

SyncSpout简介

SyncSpout是用来构造可交互的、同步的Storm拓扑的组件。我们在做实时推荐系统中,希望将Storm的并发性和分布式计算能力应用到“请求-响应”范式中, 比如客户的某次购买行为能够以消息的形式发送到storm拓扑中,storm在指定时间返回推荐结果,也就是说storm需要具有可交互性。基于这样的背景,大数据团队开发了SyncSpout组件, 该组件可以接收客户端异步的消息,经过Storm拓扑异步计算,在指定时间内返回给客户端。

架构图

架构图

关键组件介绍

  • SyncSpout:继承storm的IRichSpout,用于接收客户端调用消息并将消息emit出去的Spout
  • SendBolt:拓扑中发送计算结果的bolt,该bolt将计算结果返回给客户端
  • SyncSpoutClient:用于向SyncSpout发送同步消息,并在指定时间内获取结果

特性

  • 使普通的storm应用可交互
  • storm应用重启后,客户端可自动重连
  • 对storm应用几乎没有侵入,对业务没有侵入
  • storm集群返回的计算结果能够准确的返回给指定客户端的某次调用
  • 客户端可发送任意类型的消息给storm应用;storm应用可返回任意类型的消息给客户端
  • 客户端可在指定时间内同步获取storm应用返回的计算结果
  • 支持高并发,在单机环境下1000并发量基本在100毫秒内返回

与Storm官方DRPC的异同

  • 都能接收一个远程请求,发送请求到storm拓扑,从storm拓扑接收结果,发送结果回等待的客户端
  • DRPC只能处理字符串;SyncSpout可以处理任意可序列化的类型
  • DRPC仅能处理“线性的”DRPC拓扑,计算以一连串步骤的形式表达;SyncSpout能够处理任意类型的storm拓扑
  • DRPC的功能被移植到了Trident中,从原生Storm被废弃了;SyncSpout会被SHRB一直维护

用法

客户端

// 创建客户端
val client = new SyncSpoutClient(topName)
// 初始化
client.init()
// 向远程storm集群发送消息,并在1000毫秒内返回,若超时则返回null指针
val syncResult = client.ask(ClientMsg("这是发送的消息,可以是任意类型"),1000).asInstanceOf[String]
println(s"返回消息是[$syncResult],可以是任意类型")

storm集群

val builder = new TopologyBuilder()
// ActorSpout用于接收消息
builder.setSpout("syncSpout",SyncSpout(),2)
// SimpleBolt用于处理消息
builder.setBolt("simpleBolt",new SimpleBolt(),2).setNumTasks(4).shuffleGrouping("syncSpout")
// SendBolt用于返回消息
builder.setBolt("sendBolt",new SendBolt(),2).shuffleGrouping("simpleBolt")
val cluster = new LocalCluster()
val topName = "SyncSpoutTop"
val conf = new Config()
conf.setNumWorkers(2)
cluster.submitTopology(topName,conf,builder.createTopology())
println( "SyncSpout 启动成功!" )

注意点

  • 客户端实例化时的topName就是storm集群中的名称
  • sync-spout-server.conf、sync-spout-client.conf中需要配置zookeeper的host列表

引用第三方类库

联系方式

E-MAIL:echo MzY1NzgxMDYyQHFxLmNvbQo= | base64 -d

QQ群号码:620317570

QQ群二维码

QQ群二维码

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].