在了解RabbitMQ之前,首先要了解AMQP协议。AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受不同客户端/中间件产品,不同开发语言等条件的限制。
AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS
实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持
C 、Ruby、Java、JMS、Python和.NET
一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、
C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台
一个高性能的消息平台,在分布式消息网络可作为兼容AMQP的Broker节点,绑定了多种语言,包括Python、
C、C 、Lisp、Ruby等
是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTfulHTTP访问网络AMQP的能力
一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。每个连接(包括所有channel)都必须关联至一个虚拟主机
所有这些组件的属性各不相同,但是只有交换器和队列被命名。客户端可以通过交换器的名字来发送消息,也可以通过队列的名字收取信息。因为AMQ协议没有一个通用的标准方法来获得所有组件的名称,所以客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有自己知道的名字。
绑定器没有名字,它们的生命期依赖于所紧密连接的交换器和队列。如果这两者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还需要设置消息路由。
模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,bindingkey*.stock.#匹配routing-keyusd.stcok和eur.stock.db,但是不匹配stock.nasdaq
没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。通过订阅一个队列,消费者能够从队列中获取消息,然后在使用过后将其从队列中删除。
这个特点告诉我们,在系统中,任意队列都可以和默认的direct交换器绑定在一起,只要其routing-key等于队列名字。
默认绑定器的行为揭示了多绑定器的存在,将一个或者多个队列和一个或者多个交换器绑定起来。这使得可以将发送到不同交换器的具有不同routingkey(或者其他属性)的消息发送到同一个队列中。
这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉–它们只是短暂地连接到Broker,但是可以用于实现例如RPC或者在AMQ上的对等通信。
队列也可以是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并不是发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,然后在使用完之后删除掉。
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routingkey为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
rabbitmq-server-generic-unix-3.0.0.tar.gzRabbitMQ服务端
rabbitmq-java-client-bin-3.0.0.tar.gzRabbitMQ客户端,包含性能测试脚本
RabbitMQ客户端与服务端的安装直接解压安装包即可,客户端的目录中,rabbitmq-client.jar为JAVA版的客户端,编写客户端程序时需要引用,脚本文件为性能测试脚本
rabbitmq-pluginsenablerabbitmq_management打开WEB管理界面插件,默认访问地址:
一条以quick.orange.rabbit为routeKey的消息,Q1和Q2都会收到,lazy.orange.elephant也是。quick.orange.fox只能发送到Q1,lazy.brown.fox只能到Q2.lazy.pink.rabbit虽然符合两个匹配规则,但只发送到Q2,因为先匹配的lasy.#规则。quick.brown.fox则Q1和Q2都收不到,会被直接丢弃。
以上示例都是异步的,即生产者不需要等待消费者的反馈。在实际情况中,有些时候在消息处理比较快,且需要及时反馈时,则需要同步的方式,生产者发送消息,在收到消费者的反馈前一直处于阻塞状态。因为等待的返回来自远程主机,这种方式也被称为RPC(Remoteprocedurecall)。RPC的实现有很多,比如JAVA平台下的RMI,JMX。
当RPC客户端发送消息时,设置replyTo和correlationId参数。replyTo参数为反馈队列的名称,correlationId作为一次请求的唯一标识,要每次请求都不同,用于关联服务端的反馈消息
服务端等待请求,当收到请求后,处理请求,并将反馈通过replyTo指定的反馈队列发送回去
客户端收到反馈,并校验correlationId的值是否与发送的一致,如果一致,则一次请求完成
为保证消息的可靠传递,服务器使用持久化保证消息不丢失。包括exchange与queue必须定义为持久的,同时发送消息时,也要设置消息为持久消息。
生产者的消息确认叫做confirm,confirm确保消息已经发送到MQ中。当connection或channel异常时,会重新发送消息,如果消息是持久的,并不能一定保证消息持久化到磁盘中,因为消息可能存在与磁盘的缓存中。为进一步提高可靠性,可以使用事务。Confirm与事务不能同时使用。
当生产者收不到confirm时,消息可能会重复,所以如果消息不允许重复,则消费者需要自己实现消息去重。
消费者的消息确认叫做Acknowledgements,Acknowledgements确保消费者已经处理了消息,如果收不到消费者的Acknowledgements,MQ会重新发送消息。
同样,MQ也可能收不到消费者的Acknowledgements,就会重复发送消息,若要避免,消费者需要自己实现消息去重。
RabbitMQ提供了3中分布式的解决方案,cluster,federation,shovel。cluster用于可靠的本地局域网,后两种用于不可靠的网络。
Cluster将多台机器连接为一个逻辑broker,各机器之间使用Erlang消息通信,所以cluster中各机器必须有一样的Erlangcookie,并且机器之间的网络要是可靠的,并且都运行相同版本的Erlang。
Virtualhosts,exchanges,用户及权限都在所有节点同步,queues可以位于本机,也可以作为镜像队列,在各个机器之间同步。
Federation允许一个exchange从另外一台机器或者cluster的exchange中接收消息,因为是两个exchange联合起来,所以必须有相同的用户权限。
Federation是从一个exchange到另一个exchange,而Shovel是从一边的queue中取走消息并发送到另一个exchange。
通常在通过连接broker的时,并且需要获得比Federation更多控制权的时候使用Shovel。
当生产者发送消息的速率大于消息被路由到queue的速率时,会触发流量控制,发送速率受到限制,但不会完全阻塞。
当内存使用达到vm_memory_high_watermark的值时,会触发流量控制,生产者被阻塞。vm_memory_high_watermark的默认值是系统内存的40%,这个值可以在配置文件中修改。
或者在运行时通过命令rabbitmqctlset_vm_memory_high_watermarkfraction修改,修改立即生效,但下次重启后恢复。所以要永久修改,必须同时修改配置文件。
当磁盘剩余空间小于disk_free_limit的值时,触发流量控制,生产者被阻塞。disk_free_limit的默认值是1GB,可在配置文件中修改。
通过命令rabbitmqctlstatus可以查看内存使用状态,或者在WEB管理界面中点击节点后查看。
Mnesia表示MQ中定义的exchange,queue,bindings,用户及权限占用的内存
RabbitMQ的默认配置在大部分情况下是最佳配置,如果服务运行良好,不需要修改。RabbitMQ支持3种方式修改配置:环境变量、配置文件、运行时参数与策略。
环境变量可以配置到shell环境变量中,也可以在RabbitMQ的环境变量中配置。例如:配置服务绑定IP,可以在shell环境变量里配置RABBITMQ_NODE_IP_ADDRESS的值,也可以在RabbitMQ的环境变量中配置NODE_IP_ADDRESS的值,即RabbitMQ的环境变量中变量名称要去掉RABBITMQ_。RabbitMQ的环境变量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的优先级为shell环境变量优先于RabbitMQ的环境变量,RabbitMQ的环境变量优先于RabbitMQ默认的环境变量。
通过rabbitmqctl命令可以在运行时修改配置,例如修改vm_memory_high_watermark。还有些配置,比如镜像队列,是通过管理界面或命令配置策略实现的。
也可以在集群的基础上配置主从备份。主从备份依赖Pacemaker来管理资源,主从备份的方式已不推荐使用,而镜像队列则更容易使用,且可靠性更高。
虽然使用cluster可以提高可靠性,exchange,binding在各个机器是共享的,但是queue中的消息实际上还是存在单独的机器,如果一台机器不可用,那么在这台机器恢复前,这台机器中存储的消息也是不可用的。
为解决这样的问题,引入了镜像队列,镜像队列是在集群中为队列建立的一个或多个物理镜像,这些镜像分别存储在主节点之外的其他节点,所有节点中的队列共同组成一个逻辑队列。将一个队列做镜像后,即使此机器不可用,RabbitMQ会自动从镜像中选择一个继续使用,不会导致队列中的消息不可用。
如果为一个队列建立多个镜像,前者称为主节点,后者称为从节点。如果主节点有问题,那么RabbitMQ会从从节点中选择最早同步的一个作为新的主节点,以保证尽量不丢失消息,然而原主节点中同步之前的消息还是会丢失。
镜像队列运行在cluster中,不建议通过WAN使用,也就是不建议在Federation和Shovel中使用。
镜像队列是通过策略配置的,添加一个策略,匹配相应的队列,然后指定一个key为ha-mode的参数,例如:
这个策略设置所有的节点都为ha.开头的队列做镜像。这个设置也可以在管理界面中添加,详细信息请参考
硬件环境:CPU::Intel(R)Core(TM)i5-2400CPU@3.10GHz
rabbitmq-server-generic-unix-3.0.0.tar.gz(单台)
producerconsumerconfirm(maxunconfirmedpublishes100)ackpersistentthroughput(msg/s)
/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
RabbitMQ中的队列性能是一个值得关注的地方。在设计方案时就应该考虑到。队列只有在保持队列中不积压消息时,性能才是最佳的,队列中积压的消息越多,性能下降越多。
例如生产者发送消息的速度是600msg/s,消费者接收的速度是1200msg/s,正常情况下,是没有性能问题的。这时如果停止消费者一段时间,让消息在队列中积压,然后在打开消费者。按理消费者的速度大于生产者速度,可以转发新消息,并把老消息也取走,最终队列又回到为空的状态。但实际情况则不是,队列中的消息会继续积压,而且会继续变多,而这时消费者的速度就不如之前的了。
RabbitMQ中的队列,在实现上又分为多个小的队列,每个队列里存储着不同状态的消息。当消息不积压时,消息由交换器到达队列,就会被直接发送给消费者。而当消息堆积时,由于占用较多内存,RabbitMQ会把消息放入更深层次的队列,例如将内存中的消息换出到磁盘上(不管消息是否持久化),而这些操作会消耗更多的CPU等系统资源,从而导致影响队列中消息的发送。
停止发送消息,让系统资源都集中到向消费者发送消息,队列中的消息逐渐减少,队列最终会恢复至为空状态。
有些时候不能停止生产者,这时可以改变绑定,让新消息发送到新的队列,新队列必须位于新的机器上。当然也需要新的消费者来连接。这样可以让老队列中的消息慢慢取走,也不影响新消息的发送。
默认的集群模式下,虽然消息可以发送到一台机器,然后从另一台机器取出,但是因为每台机器的queue实际上消息是本地存储,所以消息发到A的queue,从B中取,首先需要从A再次发送到B中,这样会导致取消息的效率不高。
如果使用镜像模式,A中的消息会同步到B中,消费者从B中取消息,消息是从本地取了,但是队列做镜像依然对性能影响很大,尤其是镜像的数目增加,性能会成倍下降。镜像队列优于普通模式的地方在于可靠性,普通模式中,A如果有故障,那么A中的消息就无法取出。镜像模式中,A有故障,消息依然可以从B中取出。
以下是我们生产环境的集群配置方案,因为对于吞吐量要求很高,单台RabbitMQ无法满足性能要求,所以选择使用cluster,而镜像模式对于性能影响很大,只能采取其他方案:假设3台RabbitMQ组成一个集群。然后建立多个queue,exchange使用direct类型,并绑定所有queue,routeKey为0到2(和MQ的数量一致)中随机发送。生产者发送消息到exchange,并路由到各个queue,消费者也有多个,同时从各个queue获取消息。生产者与消费者使用多channel提高速度,同时消费者使用异步接收方式。
使用多个队列,可以显著提高集群的吞吐量,每个队列要位于不同的物理机器上。考虑性能优先,也取消了消息持久化。但是在可靠性方面,如果某个队列不可用,那么发送给这个队列的消息就会被丢弃。为避免这种情况,采用备用绑定与备用队列的方式,即建立多个绑定,默认情况exchange通过routeKey0,1,2绑定队列a,b,c(橙色线路),备用绑定是exchange通过routeKey0,1,2绑定队列d(紫色线路)。比如当队列a不可用时,默认的绑定routeKey为0的消息就无法发送到a队列,这时备用策略自动生效,routeKey为0的消息会被发送到队列d上(走紫色线路),routeKey为1和2的消息照常发到b和c(还是橙色线路)。这样就可以确保消息不丢失。若要进一步提高可靠性,降低备用队列的压力,可以建立多个备用队列,然后将绑定分散开来。
[5]/clevertanglei900@126/blog/static/121041853/
[6]/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
[7]/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
[8]/blog/2011/10/27/performance-of-queues-when-less-is-more/