前面介绍了简单的RabbitMQ示例:一个生产者、一个消费者。然而,面对复杂耗时的任务时,为了提高工作效率,往往会使用多个消费者进行消费。此时的工作队列(Work Queue)需要将消息分发给多个消费者。
为了模拟出复杂、耗时的任务,我们假设每条字符串消息代表一项任务,并且字符串中的点(.)代表该项任务耗时的多少(每个点代表1秒)。比如,”hello world…”表示这项任务将耗时3秒钟完成。
1.生产者应用程序
(1)建立连接,跟前面的hello world示例如出一辙:
(2)声明队列:
channel.queue_declare(queue='task_queue', durable=True)
其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。
(3)生产消息:
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, #make message persisent ))
其中,
1)指定默认交换器为’’,指定路由关键字为’task_queue’;
2)指定消息属性为2,即持久化消息。
(4)关闭连接:
connection.close()
2.消费者应用程序
(1)建立连接,同上。
(2)声明队列,同上。
(3)定义消费信息时的回调函数:
def callback(ch, method, properities, body): print("Received %r" % body) time.sleep(body.count(b'.')) print("Done") ch.basic_ack(delivery_tag = method.delivery_tag)
其中,
1)根据消息字符串中点的个数,休眠相应的时间;
2)消费完消息之后,返回确认状态。
(4)预取消息:
channel.basic_qos(prefetch_count=1)
预取消息的条数为1。
(5)开始消费
channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
3.消息确认
对比前文中的示例1和此文中的示例2中的回调函数callback()会发现稍有不同。
示例1中显示的关闭的消息确认机制,这是因为,默认情况下消息确认机制是开启的:
channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True)
示例2未显示关闭,则表示启用了消息确认机制,这保证了消费者应用程序在处理消息的过程中若异常崩溃,不会丢失当前处理的消息。因为,消费者应用程序崩溃之后,消息代理(message broker)无法收到该消息返回的ack,那么它会重新将这条消息发送至队列中去(re-queue),这样的话,其他的消费者就可以接着处理这条消息了。
简单地说,启动了消息确认机制之后,消息代理在没有收到ack之前,该条消息不会从消息代理中移除。
4.消息持久性
消息确认机制使得在某个消费者挂掉之后,该消息仍然能够得到其他的消费者进行正常处理。然而,若RabbitMQ服务异常关闭,就没有这么幸运了。
所以,若想在RabbitMQ服务重启之后,队列和消息不会丢失,则需要将它们的属性设置成持久化。
队列持久化:
channel.queue_declare(queue='task_queue', durable=True)
消息持久化
channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ))
其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。
1)将队列和消息都设置为持久化以后,rabbitmq服务重启后队列和消息都还在
2)默认消费方的auth_ack=False,如果消费方消息处理完成后没有给rabbitmq服务手动返回ack,则该消息依旧在服务端队列中,unacked消息数+1。当消费方断开连接后,unacked-1,ready+1
3)当没有设置队列和消息持久化,rabbitmq重启后消息会丢失
4)当设置队列和消息持久化,rabbitmq重启后消息不会丢失
5)数据存储:/var/lib/rabbitmq/mnesia/rabbit@master-21/msg_stores/vhosts
6)当队列的消息被消费后,磁盘里的数据大小会减少
5.预取消息
假设有编号为1-10的十条消息(任务),其中奇数编号的任务耗时很长,偶数编号的任务耗时很短。消费者C1会被分配奇数编号的任务,消费者C2会被分配执行偶数编号的任务,这会导致一种情况:C2会很快的执行完所有的偶数编号任务,C1仍然在执行第1条任务,而第3、5、7、9虽然已经分配给了C1却在等待着被执行。
此时,即使空闲的C2却没法帮C1分担任务。
使用预取消息功能,可以解决类似问题,
设置消费者每次从队列里预取一条消息。这样一来,RabbitMQ每次只会发送一条消息给消费者,并且在消费者处理完该条消息之前(或者说在返回ACK之前),消费者不会再接收到其他消息:
channel.basic_qos(prefetch_count=1)
6.示例
(1)生产者应用程序
$cat send.py import pika import time import sys credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() message = ' '.join(sys.argv[1:]) channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print("Sent %r" % message) connection.close()
下面有两个消费者
$cat receive-1.py import pika import time credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print('Waiting for message. To exit press CTRL+C') def callback(ch, method, properties, body): print("Received %r" % body) time.sleep(body.count(b'.')) print("Done") # ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) print("waiting...") channel.start_consuming() $cat receive-2.py import pika import time credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print('Waiting for message. To exit press CTRL+C') def callback(ch, method, properties, body): print("Received %r" % body) time.sleep(body.count(b'.')) print("Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) print("waiting...") channel.start_consuming()
测试
发送5条消息 (test) D:\python测试\test>python send.py hello. (test) D:\python测试\test>python send.py hello.. (test) D:\python测试\test>python send.py hello... (test) D:\python测试\test>python send.py hello.... (test) D:\python测试\test>python send.py hello..... 界面显示 Ready:5, Unacked:0, Total:5 消费方1消费 (test) D:\python测试\test>python receive-1.py 界面显示 Ready:4, Unacked:1, Total:5 因为没有应答,表示没有处理完 消费方1断开连接 界面显示 Ready:5, Unacked:0, Total:5 消费方2消费 (test) D:\python测试\test>python receive-2.py 界面显示 Ready:0, Unacked:0, Total:0 因为有应答,所有都处理完成了 将receive-1.py中的手动应答打开,两个receive同时执行,则两个会按顺序分别处理没有消费的数据,负载均衡
本文主要学习了RabbitMQ的任务分发机制,介绍了队列持久性、消息持久性、消息确认以及预取消息等特性,并给出了相应的编码示例。
参考:https://www.rabbitmq.com/tutorials/tutorial-two-python.html
参考:https://dulishu.top/rabbitmq-work-queues/
评论前必须登录!
注册