RibbitMQ初识

几个概念说明:

创新互联公司是一家集网站建设,云安企业网站建设,云安品牌网站建设,网站定制,云安网站建设报价,网络营销,网络优化,云安网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

Broker:简单来说就是消息队列服务器实体。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。

(2)客户端声明一个exchange,并设置相关属性。

(3)客户端声明一个queue,并设置相关属性。

(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

(5)客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

(1)exchange持久化,在声明时指定durable => 1

(2)queue持久化,在声明时指定durable => 1

(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

1.必需掌握的指令

添加用户:

rabbitmqctl add_user rainbird password

添加权限:

rabbitmqctl set_permissions -p "/" rainbird ".*" ".*" ".*"

删除测试用户:

rabbitmqctl delete_user guest

所有指令列表(很简单的英文):

add_user

delete_user

change_password

list_users

add_vhost

delete_vhost

list_vhosts

set_permissions [-p ]

clear_permissions [-p ]

list_permissions [-p ]

list_user_permissions

list_queues [-p ] [ ...]

list_exchanges [-p ] [ ...]

list_bindings [-p ]

list_connections [ ...]

2.vhost / 不能删除

删除/以后,新建立的vhost不能正常使用(即便不删除/,新建立的vhost也是不能正常使用).不知道为什么,有待研究.

3.关于持久化

示例里没有一点儿和持久化相关的东东,而这却是笔者最关心的,想想作为消息服务器如果不能保证消息一定被接收到,算什么事儿啊?比着网上狂转的python版本从php-amqp的库里一点一点儿翻,找到了如下持久化的设置:

接收端声明队列和交换机自动建立:

$ch->queue_declare($_QUEUE,false,true,false,false);

第三个参数设置true保证服务器重启后,自动建立队列

第五个参数设置成false防止接收端没连接的时候丢失消息

$ch->exchange_declare($EXCHANGE, \'direct\', false, true, false);

第四个参数设置true保证重启后,自动建立交换机

第五个参数设置false防止接收端断开后,交换机被删除

发布端声明消息持久:

$message = new AMQPMessage(serialize($object), array(\'content_type\' => \'text/plain\', \'delivery_mode\' => 2));

同时满足了上面三个条件,就可以保证未接收的消息在服务器意外重启以后依然存在了.

4.持久化的后遗症

比如说你初始化了一个队列msgs.你会发现它真的持久了!每次服务器端重启后,通过list_queues命令查看的时候都存在.但是时间久了,这个msgs我们并不需要了,怎么办呢?笔者发现,想清除这个队列只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限.

rabbitmqctl delete_vhost /

rabbitmqctl add_vhost /

rabbitmqctl set_permissions -p / rainbird \'.*\' \'.*\' \'.*\'

要注意,如果这个操作过程中有接收端处于连接状态它们不会自动断开,但也不会再收到消息,需要手动重新连接一下.

5.关于修改监听ip和监听端口

出于一些需要,比如我们有多个ip,我们希望rabbitmq仅运行在指定的ip上.或者考虑到安全问题,我们希望修改一下rabbitmq的监听端口.默认安装完成以后,在/etc下面会有一个rabbitmq的空目录,这时候我们需要手工创建rabbitmq.conf,并写入相关内容.

vi /etc/rabbitmq/rabbitmq.conf

RABBITMQ_NODE_IP_ADDRESS=0.0.0.0

RABBITMQ_NODE_PORT=2222

保存以后重启服务就生效了.

这个东东网上又没介绍,翻了半天+无限尝试才搞出来.

6.关于运行接收端cpu100%问题

第一眼看到接收端会运行一个while等待消息的时候,笔者就知道这个进程肯定cpu占用会100%.在代码里几处while尝试添加usleep无效后,笔者最后还是在官方的问题列表里找到了答案:

vi +286 amqp_wire.inc

293 while ($read < $n && (false !== ($buf = fread($this->sock, $n - $read))))

294 {

295 usleep(50000);

296 $read += strlen($buf);

297 $res .= $buf;

298 }

笔者的出发点是对的,只是没找对while.可能有人会奇怪为什么要用usleep(50000)呢?实际上笔者有遇到运行php起来的daemon导致cpu100%的情况.当时笔者加的是usleep(500000)也就是半秒钟.这样就可以使进程看上去cpu占用为0.没想到再降一个数量级也是可以正常的,这次算赚到了.

7.学到了error_log函数

以前有见过这个函数,以为是向系统日志里写log的时候才用得到呢,没想到还可以像下面这样用:

function debug_msg($s)

{

//error_log($s);

}

在不同的地方写上debug_msg,最后不用的时候时候,直接注释掉error_log,不错的小技巧!

RabbitMQ将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于RabbitMQ的消息确认机制(Message acknowledgment)是否打开。

为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。

RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。

消息确认机制默认是打开的,除非你设置no_ack=True标记来手工关闭它。

通过如下命令查看系统里的未确认消息:

# rabbitmqctl list_queues -p /path name messages_unacknowledged

Listing queues …

queue_storm_actionlog 0

dev_queue_storm_actionlog 0

…done.


分享文章:RibbitMQ初识
文章链接:http://scyanting.com/article/cjiipe.html