前言
AWS MQ是完全托管的 ActiveMQ 服务, 最近需要使用, 于是学习其文档, 实践其特性, 由于 ActiveMQ 支持非常丰富的协议, OpenWire amqp stomp mqtt, 所以也学习了各大协议的特性及其SDK.
安装
本地开发最方便的方式当然是docker了, rmohr/activemq 文档比较好的且有aws支持的5.15.6版本的tag.
需要注意的是, 首先要根据其docker hub镜像文档上的几步操作, 将镜像中的默认配置文件复制到自定义的本机conf目录下 /usr/local/activemq/conf
, 然后就快速地启动了一个默认配置的 ActiveMQ server
active mq |
特性
Advisory
ActiveMQ可以将本身的一些事件投递到系统的消息队列, 如 queue/topic的创建, 没有消费者的queue/topic等. http://activemq.apache.org/advisory-message.html
这个特性对于监控MQ非常有用, 默认配置时关闭的, 需要在配置文件activemq.xml中打开.
Wildcards
通配符
. 用于分割名字中的多个单词 |
通配符可以用在配置文件中表名作用范围, 也可以用于订阅时的destination名字, 这个功能很不错.
Virtual Topic
所谓virtual topic 就是将一个正常的topic, 变成了多个queue. 如TopicA启用了Virtual topic, 则consumer可以去消费 Consumer.xxx.TopicA 这样模式的queue的消息. (http://activemq.apache.org/virtual-destinations.html)
xxx对应类似NSQ中的Channel概念.
需要在activemq.xml中配置virtualDestinationInterceptor的范围 prefix及其他选项.
name=">"
表示所有的topic都启用virtualTopic功能.prefix="Consumer.*."
表示可以订阅的virtualTopic的pattern是Consumer..
<destinationInterceptors> |
Delay & Schedule
ActiveMQ支持延时消息及定时消息, 在message header中带上如下字段即可, 其中AMQ_SCHEDULED_PERIOD的最大值是long的最大值, 所以可以设置延时很长时间.
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | The time in milliseconds that a message will wait before being scheduled to be delivered by the broker |
AMQ_SCHEDULED_PERIOD | long | The time in milliseconds to wait after the start time to wait before scheduling the message again |
AMQ_SCHEDULED_REPEAT | int | The number of times to repeat scheduling a message for delivery |
AMQ_SCHEDULED_CRON | String | Use a Cron entry to set the schedule |
Dead Letter Queue
如果broker投递给消费者消息, 没有ACK或NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列, 默认只有一个公共的死信队列ActiveMQ.DLQ, 如果需要给topic分别设置死信队列, 则要在修改activemq.xml.
<broker> |
默认非持久化的topic不会进入到死信队列中, 如果需要, 则修改activemq.xml, 加入
<!-- |
实践
STOMP
STOMP是Simple (or Streaming) Text Orientated Messaging Protocol 的缩写, 设计思路借鉴了HTTP, 有content-type, header, body, frame based, text based等类似HTTP的相关概念, 设计文档 < https://stomp.github.io/stomp-specification-1.2.html>, 非常得简洁, 一页就讲完了.
协议细节及特点:
- 对于重复的header key, 只有第一个有效.
- 服务端可以限制消息大小, header field数量, header长度.
- 一个client开多个subscriber时, 必须设置subscribe id.
- NACK command 表示 requeue.
- stomp有事务的概念, 消息从producer发出到broker确认收到算一个事务, broker投递到consumer ACK算一个事务, 事务具有原子性.
- 支持SSL.
ActiveMQ作为STOMP server
支持 v1.1版本的STMOP协议.
默认最大消息长度
maxDataLength
为104857600
,maxFrameSize
为MAX_LONG
.通过
destination
名字前缀是/queue/
还是/topic
/ 来区分是queue
(生产消费模型)还是topic
(发布订阅模型). 真正的名字是去掉包括两个/
符号的前缀后的.发送默认不是持久化的, 需要在SEND时手动指定
persistent:true
的header以开启持久化.订阅默认不是持久化的, 需要在SUBSCRIBE时手动指定
activemq.subscriptionName:订阅者名字
的header来开启持久化订阅.很多特性都是靠STOMP header来处理的, ActiveMQ官方文档上有两节讲STOMP的header. http://activemq.apache.org/stomp.html#Stomp-StompExtensionsforJMSMessageSemantics
SDK
https://github.com/go-stomp/stomp 是目前star数最高的
- 提了个PR https://github.com/go-stomp/stomp/pull/58
- 解决了个issue https://github.com/go-stomp/stomp/issues/47
demo 代码
package main |
MQTT
协议文档http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
翻译版文档https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html
协议细节及特点:
- transport支持TCP, 也支持WebSocket, 所以定位于IOT.
- 不支持生产消费模型, 只支持发布订阅模型.
- 用QOS来表示消息队列中的投递语义, QOS=0 表示至多发送一次, QOS=1表示至少发送一次, QOS=2表示精确地只发送一次.
ActiveMQ作为MQTT server
- 通配符不同, MQTT的
/
+
#
分别对应 ActiveMQ的.
*
>
. - QOS=0对应的是非持久化的topic, QOS=1或者QOS=2对应持久化的topic.
AMQP
协议文档: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html
AMQP相比 stomp mqtt 就复杂得多, 毕竟名字就是高级消息队列(Advanced Message Queuing Protocol ).
协议细节及特点:
- AMQP有很多不同的概念, 如Link, Container, Node. 不看模型文档的话就直接使用SDK的话会比较费劲. ContainerID对应ActiveMQ client ID, LinkName对应ActiveMQ subscription name.
ActiveMQ作为AMQP server
- 使用1.0协议, 所以使用了0.9.1的2k star的sdk不能用.(https://github.com/streadway/amqp), 而且官方也认为没必要支持旧版本的协议.
- 默认最大消息长度
maxDataLength
为104857600
(100MB),maxFrameSize
为MAX_LONG
, consumer持有的未确认最大消息数量prefetch
为1000,producerCredit
为10000. 可通过连接的URI设定. - 支持SSL.
- 通过
destination
名字前缀是queue://
还是topic://
来区分是queue
(生产消费模型)还是topic
(发布订阅模型). 真正的名字是去掉包括两个/
符号的前缀后的.
性能
分别使用
github.com/vcabbage/amqp 76star 13issue 5contributors
github.com/go-stomp/stomp 132star 3issue 14contributors
github.com/eclipse/paho.mqtt.golang 650star 20issue 34contributors
作为SDK, 分别测试了下pub sub 1KB大小的消息普通场景.
publish性能上, amqp=stomp>mqtt, amqp和stomp差不多, 是mqtt的两倍多.
subscribe性能上, amqp比stomp快一点, mqtt则慢很多.
benchmark代码
package all_bench |
一些细节行为
官方的FAQ里面写了一些实现的细节
- 如果producer比较快而consumer比较慢的话, ActiveMQ的流量控制功能使得producer阻塞. http://activemq.apache.org/what-happens-with-a-fast-producer-and-slow-consumer.html
- 不支持消费者拿到消息之后Requeue, 即不支持像NSQ那样的消费者出现业务逻辑错误后重试.http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html. 但是可以利用延时消息实现类似的功能.
性能调优
如果使用了virtualTopic, 那么默认配置下, virtualTopic对应的Queue越多, 发送越慢, 因为默认virtualTopic转发到queue是串行的, 需要调整
concurrentSend=true
启用并发发送到queue.https://activemq.apache.org/virtual-destinations
https://issues.jboss.org/browse/ENTMQ-1093
https://github.com/apache/activemq/blob/9abe2c6f97c92fc99c5a2ef02846f62002a671cf/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java#L87concurrentStoreAndDispatchQueues
设置为false. 默认配置下, 这个值是true, 根据文档所说在快速消费者情况下, 此值设置为true可以加快持久化消息的性能, 因为被快速消费了消息可以不用落盘, 但实测发现此值为true则10个producer并发发送和1个producer并发发送的性能是一样的没有提高. 设置为false之后提高producer并发则可获得性能倍速提高, 并且单个producer的发送性能并没有下降.启用
mKahaDB
, ActiveMQ为了减少打开的文件描述符数量, 默认是用一个KahaDB实例来持久化消息, 但是在磁盘性能比较好的情况下, 一个kahaDB实例发挥不出磁盘的潜力, 启用多个kahaDB后性能可以获得倍速增长. 可以按queue名字的pattern来设置多个kahaDB实例, 也可以使用perDestination="true"
设置每个queue一个kahaDB实例, 但这个参数也有坑, 如果destination名字超过了42个字符串, 则会被截断, 发送会报不可恢复的错. 可解决的办法是手动分好destination使用的kahadb, 但是这个配置后续不能动态改了, 只能新开Broker然后迁移. 否则会重启后如果分配规则改变导致分配到了不同的kahadb, 则之前的数据不会被消费.http://sigreen.github.io/2016/02/10/amq-tuning.html
https://activemq.apache.org/kahadb#multim-kahadb-persistence-adapter