All Projects → daoshenzzg → Socket Mqtt

daoshenzzg / Socket Mqtt

Licence: apache-2.0
基于Netty+MQTT的高性能推送服务框架。支持普通Socket、MQTT、MQTT web socket协议。非常方便接入上层业务实现推送业务。

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to Socket Mqtt

Jetlinks Community
JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,HTTP等),屏蔽网络编程复杂性,灵活接入不同厂家不同协议等设备。实时数据处理,设备告警,消息通知,数据转发。地理位置,数据可视化等。能帮助你快速建立物联网相关业务系统。
Stars: ✭ 2,405 (+665.92%)
Mutual labels:  mqtt, iot, websocket, netty
Iot Dc3
IOT DC3 is an open source, distributed Internet of Things (IOT) platform based on Spring Cloud. It is used for rapid development of IOT projects and management of IOT devices. It is a set of solutions for IOT system.
Stars: ✭ 195 (-37.9%)
Mutual labels:  rpc, mqtt, iot, socket
Netty Learning Example
🥚 Netty实践学习案例,见微知著!带着你的心,跟着教程。我相信你行欧。
Stars: ✭ 2,146 (+583.44%)
Mutual labels:  rpc, mqtt, iot, netty
Smart Socket
A High Performance Java AIO framework
Stars: ✭ 341 (+8.6%)
Mutual labels:  rpc, iot, socket
Blynk Server
Blynk is an Internet of Things Platform aimed to simplify building mobile and web applications for the Internet of Things. Easily connect 400+ hardware models like Arduino, ESP8266, ESP32, Raspberry Pi and similar MCUs and drag-n-drop IOT mobile apps for iOS and Android in 5 minutes
Stars: ✭ 8 (-97.45%)
Mutual labels:  mqtt, iot, netty
Platypush
A versatile and extensible platform for home and life automation with hundreds of supported integrations
Stars: ✭ 192 (-38.85%)
Mutual labels:  mqtt, iot, websocket
Groza
开源物联网平台 - 物联网解决方案的设备管理,数据收集,处理
Stars: ✭ 364 (+15.92%)
Mutual labels:  mqtt, iot, netty
Openremote
100% open-source IoT Platform - Integrate your assets, create rules, and visualize your data
Stars: ✭ 254 (-19.11%)
Mutual labels:  mqtt, iot, websocket
Getty
a netty like asynchronous network I/O library based on tcp/udp/websocket; a bidirectional RPC framework based on JSON/Protobuf; a microservice framework based on zookeeper/etcd
Stars: ✭ 532 (+69.43%)
Mutual labels:  rpc, websocket, netty
spring-boot-protocol
springboot功能扩充-netty动态协议,可以支持各种网络协议的动态切换(单端口支持多个网络协议).支持mmap,sendfile零拷贝,http请求批量聚合
Stars: ✭ 68 (-78.34%)
Mutual labels:  mqtt, netty, rpc
Paho.mqtt.wxapp
接MQTT相关项目 paho.mqtt.wxapp可以让你在微信小程序里连接MQTT broker,实现在小程序里控制硬件,也可用于游戏,已测试的broker有mosquitto、emqtt、hivemq、百度物联网 MQTT QQ群:679985050
Stars: ✭ 295 (-6.05%)
Mutual labels:  mqtt, iot, websocket
Iot Technical Guide
🐝 IoT Technical Guide --- 从零搭建高性能物联网平台及物联网解决方案和Thingsboard源码分析 ✨ ✨ ✨ (IoT Platform, SaaS, MQTT, CoAP, HTTP, Modbus, OPC, WebSocket, 物模型,Protobuf, PostgreSQL, MongoDB, Spring Security, OAuth2, RuleEngine, Kafka, Docker)
Stars: ✭ 2,334 (+643.31%)
Mutual labels:  mqtt, iot, websocket
Joynr
A transport protocol agnostic (MQTT, HTTP, WebSockets etc.) Franca IDL based communication framework supporting multiple communication paradigms (RPC, Pub-Sub, broadcast etc.)
Stars: ✭ 124 (-60.51%)
Mutual labels:  rpc, mqtt, websocket
Jstp
Fast RPC for browser and Node.js based on TCP, WebSocket, and MDSF
Stars: ✭ 132 (-57.96%)
Mutual labels:  rpc, websocket, socket
Saea
SAEA.Socket is a high-performance IOCP framework TCP based on dotnet standard 2.0; Src contains its application test scenarios, such as websocket,rpc, redis driver, MVC WebAPI, lightweight message server, ultra large file transmission, etc. SAEA.Socket是一个高性能IOCP框架的 TCP,基于dotnet standard 2.0;Src中含有其应用测试场景,例如websocket、rpc、redis驱动、MVC WebAPI、轻量级消息服务器、超大文件传输等
Stars: ✭ 318 (+1.27%)
Mutual labels:  rpc, mqtt, socket
Thingsboard
Open-source IoT Platform - Device management, data collection, processing and visualization.
Stars: ✭ 10,526 (+3252.23%)
Mutual labels:  mqtt, iot, netty
Netty Stroll
RPC基础通信框架
Stars: ✭ 77 (-75.48%)
Mutual labels:  rpc, socket, netty
T Io
解决其它网络框架没有解决的用户痛点,让天下没有难开发的网络程序
Stars: ✭ 1,331 (+323.89%)
Mutual labels:  websocket, socket, netty
Iot push
基于netty+mqtt3.1.1+springboot+jdk8 实现的 mqtt 服务端跟客户端
Stars: ✭ 353 (+12.42%)
Mutual labels:  mqtt, iot, netty
Jupiter
Jupiter是一款性能非常不错的, 轻量级的分布式服务框架
Stars: ✭ 1,372 (+336.94%)
Mutual labels:  rpc, socket, netty

socket-mqtt: Netty4.x + MQTT

这是一个基于Netty4.x + MQTT实现的Push推送基础框架。相比于原生Netty, socket-mqtt:

  • 为C/S模式开发封装简单统一的编程模式
  • 简单高性能的代码
  • 统一的连接管理方案
  • 统一的线程管理方案
  • 网络基础问题的解决与支持:如心跳保持、压缩解压缩、编码与解码、加密与解密等
  • 各种网络参数、连接池实现、监听器实现等可配置可替换
  • 可实现对等集群
  • 提供数据统计/监控组件
  • 支持普通socket、MQTT、MQTT web socket协议

项目结构

  • codec: 封装编码与解码
  • compression: 封装压缩与解压缩
  • count: 封装统计信息
  • database: 基于hsql的内存数据库
  • encrypt: 封装加密与解密
  • future: 封装同步和异步调用
  • listener: 封装事件监听,包括消息、通道、异常三类事件监听器
  • service: 封装C/S模型、通道、心跳管理、消息分发等核心模块

Linux内核参数配置

# 允许回收TCP连接
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1

# TCP 缓冲区内存
net.ipv4.tcp_rmem = 4096 87380 8388608
net.ipv4.tcp_wmem = 4096 87380 8388608
net.ipv4.tcp_mem = 94500000 915000000 927000000  

# ulimits 优化
fs.file-max = 1065353
kernel.pid_max = 65536
*	soft		nofile	655360  
*	hard		nofile	655360

压测报告

单Broker8核16G,支持44万连接;1万客户端 单消息1024B 下行tps: 16万+; 4000客户端 Publish 单消息1024B 上行tps: 17万+,千兆网卡流量基本打满。 备注:Mqtt Server启动内存只分配了5G,如果分配到10G,理论上可以支持百万连接。还有,测试开启了心跳上报。

消息下行能力

1万Clients订阅的消息下行能力 对应下行负载情况

消息上行能力

4000Clients订阅消息上行能力 对应上行负载情况

查看连接数情况

查看连接数(telnet 10.43.204.61 8001; get status) 查看连接数(ss -l)

使用说明

各种测试类的源码在src/test/java/com/yb/socket包路径下: 包括:

  • 普通socket Server/Client
  • MQTT socket Server/Client
  • 带注册中心的普通socket/MQTT socket
  • 基于内存数据库的模拟订阅推送

服务启动配置选项

Server server = new Server();
// 设置Broker端口
server.setPort(8000); 
// 设置启动信息统计。默认true
server.setOpenCount(true);
// 设置启用心跳功能。默认true
server.setCheckHeartbeat(true);
// 设置启动服务状态,默认端口8001。通过telnet server_ip 8001; get status查看服务信息
server.setOpenStatus(true);
// 服务状态端口。默认8001
server.setStatusPort(8001);
// 设置服务名称
server.setServiceName("Demo");
// 设置工作线程数量。默认CPU个数+1
server.setWorkerCount(64);
// 是否开户业务处理线程池。默认false
server.setOpenExecutor(true);
// 设置tcp no delay。默认true
server.setTcpNoDelay(true);
// 是否启用keepAlive。默认true
server.setKeepAlive(true);
// 自定义监听器,可处理相关事件
server.addEventListener(new EchoMessageEventListener());
// 设置Broker启动协议。SocketType.MQTT - MQTT协议; SocketType.NORMAL - 普通Socket协议;SocketType.MQTT_WS - MQTT web socket协议;
server.setSocketType(SocketType.MQTT);
// 绑定端口启动服务
server.bind();

MQTT web socket server DEMO

Server server = new Server();
server.setPort(8000);
server.addEventListener(new EchoMessageEventListener());
server.setSocketType(SocketType.MQTT_WS);
server.bind();

//模拟推送
String message = "this is a web socket message!";
MqttRequest mqttRequest = new MqttRequest((message.getBytes()));
while (true) {
    if (server.getChannels().size() > 0) {
        logger.info("模拟推送消息");
        for (WrappedChannel channel : server.getChannels().values()) {
            server.send(channel, "yb/notice/", mqttRequest);
        }
    }
    Thread.sleep(1000L);
}

MQTT web socket client(浏览器)

可用在线mqtt测试:http://www.tongxinmao.com/txm/webmqtt.php
Topic	Payload	Time	QoS
yb/notice/	this is a web socket message!	2019-2-27 16:54:54	0

Normal socket server DEMO

Server server = new Server();
server.setPort(8000);
server.addEventListener(new JsonEchoMessageEventListener());
server.addChannelHandler("decoder", new JsonDecoder());
server.addChannelHandler("encoder", new JsonEncoder());
server.bind();

//模拟推送
JSONObject message = new JSONObject();
message.put("action", "echo");
message.put("message", "this is a normal socket message!");

Request request = new Request();
request.setSequence(0);
request.setMessage(message);
while (true) {
    if (server.getChannels().size() > 0) {
        logger.info("模拟推送消息");
        for (WrappedChannel channel : server.getChannels().values()) {
            channel.send(request);
            Thread.sleep(5000L);
        }
    }
}

Normal socket client DEMO

Client client = new Client();
client.setIp("127.0.0.1");
client.setPort(8000);
client.setConnectTimeout(10000);
client.addChannelHandler("decoder", new JsonDecoder());
client.addChannelHandler("encoder", new JsonEncoder());
client.connect();

for (int i = 0; i < 2; i++) {
    JSONObject message = new JSONObject();
    message.put("action", "echo");
    message.put("message", "hello world!");

    Request request = new Request();
    request.setSequence(i);
    request.setMessage(message);
    Response response = (Response) client.sendWithSync(request, 3000);

    logger.info("成功接收到同步的返回: '{}'.", response);
}

client.shutdown();

带注册中心 center DEMO

Server server = new Server();
server.setPort(9000);
server.setCheckHeartbeat(false);
server.addChannelHandler("decoder", new JsonDecoder());
server.addChannelHandler("encoder", new JsonEncoder());
server.addEventListener(new com.yb.socket.center.CenterMockMessageEventListener());
server.bind();

带注册中心 server DEMO

Server server = new Server();
server.setPort(8000);
server.setCheckHeartbeat(false);
server.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");
server.addEventListener(new JsonEchoMessageEventListener());
server.bind();

带注册中心 client DEMO

Client client = new Client();
client.setCheckHeartbeat(false);
client.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");
client.addChannelHandler("decoder", new JsonDecoder());
client.addChannelHandler("encoder", new JsonEncoder());
client.connect();

JSONObject message = new JSONObject();
message.put("action", "echo");
message.put("message", "hello");

for (int i = 0; i < 5; i++) {
    Request request = new Request();
    request.setSequence(i);
    request.setMessage(message);
    client.send(request);
    Thread.sleep(5000L);
}

后续规划

  • 支持MQTT主题过滤机制
  • 支持SSL连接方式
  • 完整的QoS服务质量等级实现DEMO
  • 遗嘱消息, 保留消息及消息分发重试

压测工具

参考项目

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].