All Projects → Jaskey → RocketMQDedupListener

Jaskey / RocketMQDedupListener

Licence: other
RocketMQ消息幂等去重消费者,支持使用MySQL或者Redis做幂等表,开箱即用

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to RocketMQDedupListener

Java-CS-Record
记录准备春招实习过程中,学习与复习的知识(模块化整理,非面试题速成)。注:暂停更新,后续请移步博客
Stars: ✭ 73 (-44.7%)
Mutual labels:  rocketmq
splink
Implementation of Fellegi-Sunter's canonical model of record linkage in Apache Spark, including EM algorithm to estimate parameters
Stars: ✭ 181 (+37.12%)
Mutual labels:  deduplication
weweibuy-framework
基于Springboot 封装的基础组件, 包括: Http请求响应日志,日志脱敏,APM, 加解密,签名(AES,BCrypt,RSA,JWT),数据库脱敏,报文脱敏,下滑线风格URL传参,统一异常处理,feign mock,feign日志,feign报文风格转换,跨应用异常上抛,自动补偿组件,幂等组件,RocketMq客户端
Stars: ✭ 24 (-81.82%)
Mutual labels:  rocketmq
rocketmq
RocketMQ client for go supportting producer and consumer.
Stars: ✭ 29 (-78.03%)
Mutual labels:  rocketmq
alibaba-middleware-race-preliminary
2016年阿里中间件性能挑战赛初赛题,RocketMQ+JStorm+Tair实时统计双11交易金额
Stars: ✭ 42 (-68.18%)
Mutual labels:  rocketmq
compose-pay
🔱 HA distributed payment gateway. 高可用分布式支付网关/支付前置/支付系统/微信/支付宝
Stars: ✭ 21 (-84.09%)
Mutual labels:  rocketmq
taotao-cloud-project
微服务开发脚手架,包括大数据模块、微服务模块、前端模块。基于Spring Cloud Alibaba的微服务架构。提供技术框架的基础能力的封装,减少开发工作,只关注业务,包含了工作以来的工作总结和技术沉淀
Stars: ✭ 76 (-42.42%)
Mutual labels:  rocketmq
record-linkage-resources
Resources for tackling record linkage / deduplication / data matching problems
Stars: ✭ 67 (-49.24%)
Mutual labels:  deduplication
dduper
Fast block-level out-of-band BTRFS deduplication tool.
Stars: ✭ 108 (-18.18%)
Mutual labels:  deduplication
jungle
微服务集成开发框架,支持一键生成微服务工程,集成Dubbo,RocketMQ,状态机,Spring开发框架
Stars: ✭ 24 (-81.82%)
Mutual labels:  rocketmq
acid-store
A library for secure, deduplicated, transactional, and verifiable data storage
Stars: ✭ 48 (-63.64%)
Mutual labels:  deduplication
SpringBootIntegration
SpringBoot集成学习项目 SpringBoot Integration
Stars: ✭ 20 (-84.85%)
Mutual labels:  rocketmq
entity-embed
PyTorch library for transforming entities like companies, products, etc. into vectors to support scalable Record Linkage / Entity Resolution using Approximate Nearest Neighbors.
Stars: ✭ 96 (-27.27%)
Mutual labels:  deduplication
short
URL shortening service. 高性能短链接服务。
Stars: ✭ 14 (-89.39%)
Mutual labels:  rocketmq
gencore
Generate duplex/single consensus reads to reduce sequencing noises and remove duplications
Stars: ✭ 91 (-31.06%)
Mutual labels:  deduplication
IntraArchiveDeduplicator
Tool for managing data-deduplication within extant compressed archive files, along with a relatively performant BK tree implementation for fuzzy image searching.
Stars: ✭ 87 (-34.09%)
Mutual labels:  deduplication
OpenLogReplicator
Open Source Oracle database CDC written purely in C++. Reads transactions directly from database redo log files and streams in JSON or Protobuf format to: Kafka, RocketMQ, flat file, network stream (plain TCP/IP or ZeroMQ)
Stars: ✭ 112 (-15.15%)
Mutual labels:  rocketmq
order-charge-notify
模拟电商下单->订单处理->订单结果通知的完整流程,基于RocketMQ进行实现
Stars: ✭ 46 (-65.15%)
Mutual labels:  rocketmq
dgiot-dashboard
DG-IoT平台行业应用扩展插件 DG-IoT for application plugin
Stars: ✭ 229 (+73.48%)
Mutual labels:  rocketmq
triton
Triton is a high-performance mq consumer, support kafka,rabbit-mq,rocketmq,nsq and other mq
Stars: ✭ 19 (-85.61%)
Mutual labels:  rocketmq

RocketMQDedupListener

通用的RocketMQ消息幂等去重消费者工具类,开箱即用

  1. 支持利用Redis或者MySQL做幂等表。
  2. 支持业务主键去重或消息ID去重(默认)
  3. 支持消息重复并发控制(重复的消息消费成功/失败前,不会同时消费第二条)
  4. 接近于EXACTLY-ONCE语义(消息只会且仅会被成功消费一次),极端场景下则为ATLEAST-ONCE语义(消息至少被成功消费一次,不会因为去重的增强而丢失消息)

内置去重原理

见以下流程图

image

去重实现的来龙去脉

可以参考本人在官方微信发表的博文: RocketMQ消息幂等的通用解决方案

Quick Start

1、继承DedupConcurrentListener类,实现消费回调和去重键的设置回调

public class SampleListener extends DedupConcurrentListener {

    public SampleListener(DedupConfig dedupConfig) {
        super(dedupConfig);
    }

    //基于什么做消息去重,每一类不同的消息都可以不一样,做去重之前会尊重此方法返回的值
    @Override
    protected String dedupMessageKey(MessageExt messageExt) {
        //为了简单示意,这里直接使用消息体作为去重键,正式使用时候不建议这样使用
        if ("TEST-TOPIC".equals(messageExt.getTopic())) {
            return new String(messageExt.getBody());
        } else {//其他使用默认的配置(消息id)
            return super.dedupMessageKey(messageExt);
        }
    }

    @Override
    protected boolean doHandleMsg(MessageExt messageExt) {
        switch (messageExt.getTopic()) {
            case "TEST-TOPIC":
                log.info("假装消费很久....{} {}", new String(messageExt.getBody()), messageExt);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {}
                break;
        }
        return true;
    }
}

2、使用此实例启动RocketMQ 消费者


            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
            consumer.subscribe("TEST-TOPIC", "*");

            //START:区别于普通RocketMQ使用的代码
            String appName = consumer.getConsumerGroup();//针对什么应用做去重,相同的消息在不同应用的去重是隔离处理的
            StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程,具体的消息幂等表会保存到Redis中
            DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
            DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
            //END:区别于普通RocketMQ使用的代码


            consumer.registerMessageListener(messageListener);
            consumer.start();
        

注:

  1. 以上省略了RocketMQ消费者的配置及StringRedisTemplate的获取过程,需要使用者自己准备。
  2. RocketMQDedupListener支持使用Redis或MySQL进行去重,更多使用详情请见SampleListener.java

使用场景测试

以下是部分实验的输出日志及现象,读者可以参考观察实验(基于SampleListener.java的实现)

一、测试普通的消息重复:

  1. 模拟正常消息:发送消息到TEST-TOPIC, 报文为,test-ljj-msg1234
  2. 模拟重复消息:隔几秒后(这个例子需要大于3秒),再发送消息到TEST-TOPIC,报文一样是test-ljj-msg1234

日志输出如下:

[INFO] 2020-05-15 11:06:17,697 []  >>> 假装消费很久....test-ljj-msg1234 MessageExt [queueId=1, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1589511454575, bornHost=/10.13.32.179:52637, storeTimestamp=1589511454576, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA31B0, commitLogOffset=1055535536, bodyCRC=1038040938, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TEST-TOPIC, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1589511977328, UNIQ_KEY=0A0D20B3632A5B2133B14A730F6F014A, WAIT=true}, body=16]]
[INFO] 2020-05-15 11:06:20,748 []  >>> consume [1] msg(s) all successfully
[WARN] 2020-05-15 11:06:26,504 []  >>> message has been consumed before! dedupKey = DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg1234","tag":"","topic":"TEST-TOPIC"}, msgId : 0A0D20B3632A5B2133B14A7332DB014B , so just ack. RedisPersist
[INFO] 2020-05-15 11:06:26,504 []  >>> consume [1] msg(s) all successfully

说明:

可以看到第二条消息被直接幂等掉了,没有进入业务的测试代码

测试并发重复消费:

  1. 模拟正常消息:发送消息到TEST-TOPIC, 报文为 test-ljj-msg123
  2. 模拟重复消息:马上(这个例子需要小于3秒)再发送消息到TEST-TOPIC,报文一样是 test-ljj-msg123

由于这里需要一些特殊说明,以下日志增加了注释

33秒第一条消息到达,这里消息会消费3秒
[INFO] 2020-05-15 11:07:33,756 []  >>> 假装消费很久....test-ljj-msg123 MessageExt [queueId=1, storeSize=168, queueOffset=2, sysFlag=0, bornTimestamp=1589511530879, bornHost=/10.13.32.179:52651, storeTimestamp=1589511530881, storeHost=/10.13.32.179:10911, msgId=0A0D20B300002A9F000000003EEA3302, commitLogOffset=1055535874, bodyCRC=146853239, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TEST-TOPIC, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1589512053623, UNIQ_KEY=0A0D20B3632A5B2133B14A74397F014C, WAIT=true}, body=15]]

35秒重复消息到达,发现前面的消息还在消费
[WARN] 2020-05-15 11:07:35,884 []  >>> the same message is considered consuming, try consume later dedupKey : DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg123","tag":"","topic":"TEST-TOPIC"}, 0A0D20B3632A5B2133B14A7441FB014D, RedisPersit
消费按消费失败处理,触发延迟消费
[WARN] 2020-05-15 11:07:35,884 []  >>> consume [1] msg(s) fails, ackIndex = [-1]

36秒第一条消息消费成功
[INFO] 2020-05-15 11:07:36,801 []  >>> consume [1] msg(s) all successfully

46秒第二条消息延迟消费开始,发现这条消息已经被成功消费
[WARN] 2020-05-15 11:07:46,024 []  >>> message has been consumed before! dedupKey = DedupElement={"application":"repay-platform-core","msgUniqKey":"test-ljj-msg123","tag":"","topic":"TEST-TOPIC"}, msgId : 0A0D20B3632A5B2133B14A7441FB014D , so just ack. RedisPersit

直接按照消费成功处理
[INFO] 2020-05-15 11:07:46,024 []  >>> consume [1] msg(s) all successfully

说明:

可以看到第二条消息在第一条消息消费的过程中就投递到消费者了,这时候去重逻辑做了并发控制,保证了业务代码的安全。


MYSQL去重支持

若希望使用MYSQL存储消息消费记录,使用上仅需把StringRedisTemplate改成JdbcTemplate:

        JdbcTemplate jdbcTemplate = null;// 这里省略获取JDBCTemplate的过程,幂等表将使用MySQL的t_rocketmq_dedup存储
        DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, jdbcTemplate);

同时需要预先建立一张消息去重表,结构如下:

-- ----------------------------
-- Table structure for t_rocketmq_dedup
-- ----------------------------
DROP TABLE IF EXISTS `t_rocketmq_dedup`;
CREATE TABLE `t_rocketmq_dedup` (
`application_name` varchar(255) NOT NULL COMMENT '消费的应用名(可以用消费者组名称)',
`topic` varchar(255) NOT NULL COMMENT '消息来源的topic(不同topic消息不会认为重复)',
`tag` varchar(16) NOT NULL COMMENT '消息的tag(同一个topic不同的tag,就算去重键一样也不会认为重复),没有tag则存""字符串',
`msg_uniq_key` varchar(255) NOT NULL COMMENT '消息的唯一键(建议使用业务主键)',
`status` varchar(16) NOT NULL COMMENT '这条消息的消费状态',
`expire_time` bigint(20) NOT NULL COMMENT '这个去重记录的过期时间(时间戳)',
UNIQUE KEY `uniq_key` (`application_name`,`topic`,`tag`,`msg_uniq_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;


说明:因为需要支持不同的应用,所以需要存储application_name,因为同一个业务主键可能来自不同的topic/tag,所以也需要存储起来。

一直消费失败会否引起死循环

不会。失败/消费中触发的延迟消费依赖与RocketMQ原生的重试机制,默认是16次。如果有希望调整延迟的时间和重试次数,需要自行调整Consumer配置。

关于作者

Apache RocketMQ Committer,知乎专栏 RocketMQ详解作者,RoceketMQ官微投稿者

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].