博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq 延迟队列的实现
阅读量:6713 次
发布时间:2019-06-25

本文共 2062 字,大约阅读时间需要 6 分钟。

hot3.png

rocketmq 延迟队列的实现 博客分类: java MQ

流程描述:

 

1. producer发消息,设置一个延迟level值. 

 

“设置消息延时 10s 消费”的 Producer 端代码如下:

 

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

 

[java]
  1. Message msg = newMessage(topic, tags, keys, body);  
  2. msg.setDelayTimeLevel(3);  
  3. ...  
  4. SendResult sendResult = getMQProducer().send(msg);  

 

2. broker 保存消息时替换了topic,和queueId(一个level计算得到一个queueId,并将实际的topic和queueId作为properties保存).

 

3. broker有定时任务(其实是个consumer)消费延迟消息,如果到达延迟时间,将消息取出,改回原来的topic和queueId,放入到commitLog中,然后被真正的消费者.

 

疑难点:

 

 问: 如何保证rocketMq的offset移动和延迟消息不冲突?

 

 答: rocketMq当消息真正要消费的时候才把消息放到对应的topic中. 中间先保存到其他地方(利用原有的存储,消费机制,自然是一个topic). rocketMq很巧妙的将用户配置的level和queueId进行了一对一映射. 这样就能保证同一个queue下的消息肯定是顺序消费的. 

 

 

 

代码细节:

 

1. producer 发 消息,设置一个level值.

 

服务端MessageStoreConfig.messageDelayLevel 默认值是

 

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

 

2. 服务端接受到消息后
MessageConst.PROPERTY_REAL_TOPI
commitLog.putMessage(MessageExtBrokerInner)里,会把配置了延迟level的消息,存到ScheduleMessageService.SCHEDULE_TOPIC(值为SCHEDULE_TOPIC_XXXX) ,
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());中真正的topic和queueId作为msg暂时存起来.
// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,    String.valueOf(msg.getQueueId()));
3.有个定时任务模拟消费者消费该queueScheduleMessageService.DeliverDelayedMessageTimerTask 内,判断是否可消息,可以就取出消息,将topic和quueeId还原,放到commitLog中
PutMessageResult putMessageResult =        ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);//
其中
private static final long FIRST_DELAY_TIME = 1000L; //定时任务第一次启动时延迟时间private static final long DELAY_FOR_A_WHILE = 100L;  // 死循环轮训时延迟时间. 又生成一个task,这样避免很多线程一直在执行.比较好的死循环策略.private static final long DELAY_FOR_A_PERIOD = 10000L; // put到commitLog出错时的延迟时间
 
 
 

 

参考文献:http://www.tuicool.com/articles/aU7JRz7       主要根据该文章的关键字去看对应的源代码,https://github.com/alibaba/RocketMQhttp://blog.csdn.net/fei33423/article/details/51189430

 

转载于:https://my.oschina.net/xiaominmin/blog/1598959

你可能感兴趣的文章
《Web异步与实时交互——iframe AJAX WebSocket开发实战》—— 2.2 相关关键技术及工作原理...
查看>>
《Nmap渗透测试指南》—第1章1.5节Mac OS安
查看>>
重磅,企业实施大数据的路径
查看>>
linux之cp/scp命令+scp命令详解
查看>>
Spark 源码分析 -- BlockStore
查看>>
《C语言编程初学者指南》一1.7 创建并运行第一个C程序
查看>>
学习和使用 PHP 应该注意的10件事
查看>>
《Ember.js实战》——2.5 Ember.js对象模型
查看>>
《响应式Web图形设计》一第13章 响应Web设计中的图像
查看>>
shiro session 监听
查看>>
定时任务框架Quartz的新玩法
查看>>
段前缀的使用(0504)
查看>>
.NET Framework 源码
查看>>
开源大数据周刊-第6期
查看>>
centos上一键安装jdk、tomcat脚本
查看>>
排序算法 时间、空间复杂度
查看>>
心痛的感觉
查看>>
class - function ES6类的方法的两种定义方式及调用方式
查看>>
flex容器主轴上的部分元素单独设置位置
查看>>
window10安装Ubuntu虚拟机踩坑系列
查看>>