路漫漫其修远兮
吾将上下而求索

rabbitmq学习(2):Work Queues

前面介绍了简单的RabbitMQ示例:一个生产者、一个消费者。然而,面对复杂耗时的任务时,为了提高工作效率,往往会使用多个消费者进行消费。此时的工作队列(Work Queue)需要将消息分发给多个消费者。

5.png

为了模拟出复杂、耗时的任务,我们假设每条字符串消息代表一项任务,并且字符串中的点(.)代表该项任务耗时的多少(每个点代表1秒)。比如,”hello world…”表示这项任务将耗时3秒钟完成。

1.生产者应用程序

(1)建立连接,跟前面的hello world示例如出一辙:

(2)声明队列:

channel.queue_declare(queue='task_queue', durable=True)

其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。

(3)生产消息:

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                        delivery_mode = 2, #make message persisent
                      ))

其中,

1)指定默认交换器为’’,指定路由关键字为’task_queue’;

2)指定消息属性为2,即持久化消息。

(4)关闭连接:

connection.close()

2.消费者应用程序

(1)建立连接,同上。

(2)声明队列,同上。

(3)定义消费信息时的回调函数:

def callback(ch, method, properities, body):
        print("Received %r" % body)
        time.sleep(body.count(b'.'))
        print("Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)

其中,

1)根据消息字符串中点的个数,休眠相应的时间;

2)消费完消息之后,返回确认状态。

(4)预取消息:

channel.basic_qos(prefetch_count=1)

预取消息的条数为1。

(5)开始消费

channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()

3.消息确认

对比前文中的示例1和此文中的示例2中的回调函数callback()会发现稍有不同。

示例1中显示的关闭的消息确认机制,这是因为,默认情况下消息确认机制是开启的:

channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)

示例2未显示关闭,则表示启用了消息确认机制,这保证了消费者应用程序在处理消息的过程中若异常崩溃,不会丢失当前处理的消息。因为,消费者应用程序崩溃之后,消息代理(message broker)无法收到该消息返回的ack,那么它会重新将这条消息发送至队列中去(re-queue),这样的话,其他的消费者就可以接着处理这条消息了。

简单地说,启动了消息确认机制之后,消息代理在没有收到ack之前,该条消息不会从消息代理中移除。

4.消息持久性

消息确认机制使得在某个消费者挂掉之后,该消息仍然能够得到其他的消费者进行正常处理。然而,若RabbitMQ服务异常关闭,就没有这么幸运了。

所以,若想在RabbitMQ服务重启之后,队列和消息不会丢失,则需要将它们的属性设置成持久化。

队列持久化:

channel.queue_declare(queue='task_queue', durable=True)

消息持久化

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))

其中,跟第一个示例稍有不同的是,此处将这个队列的durability属性设置为durable=True,即持久性队列。  

1)将队列和消息都设置为持久化以后,rabbitmq服务重启后队列和消息都还在

2)默认消费方的auth_ack=False,如果消费方消息处理完成后没有给rabbitmq服务手动返回ack,则该消息依旧在服务端队列中,unacked消息数+1。当消费方断开连接后,unacked-1,ready+1

3)当没有设置队列和消息持久化,rabbitmq重启后消息会丢失

4)当设置队列和消息持久化,rabbitmq重启后消息不会丢失

5)数据存储:/var/lib/rabbitmq/mnesia/rabbit@master-21/msg_stores/vhosts

6)当队列的消息被消费后,磁盘里的数据大小会减少

  

5.预取消息

假设有编号为1-10的十条消息(任务),其中奇数编号的任务耗时很长,偶数编号的任务耗时很短。消费者C1会被分配奇数编号的任务,消费者C2会被分配执行偶数编号的任务,这会导致一种情况:C2会很快的执行完所有的偶数编号任务,C1仍然在执行第1条任务,而第3、5、7、9虽然已经分配给了C1却在等待着被执行。

6.png

此时,即使空闲的C2却没法帮C1分担任务。

使用预取消息功能,可以解决类似问题,

设置消费者每次从队列里预取一条消息。这样一来,RabbitMQ每次只会发送一条消息给消费者,并且在消费者处理完该条消息之前(或者说在返回ACK之前),消费者不会再接收到其他消息:

channel.basic_qos(prefetch_count=1)

6.示例

(1)生产者应用程序

$cat send.py
import pika
import time
import sys

credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

message = ' '.join(sys.argv[1:])

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
	
print("Sent %r" % message)
connection.close()

下面有两个消费者

$cat receive-1.py
import pika
import time

credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('Waiting for message. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("Received %r" % body)
    time.sleep(body.count(b'.'))
    print("Done")
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print("waiting...")
channel.start_consuming()


$cat receive-2.py
import pika
import time

credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('Waiting for message. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("Received %r" % body)
    time.sleep(body.count(b'.'))
    print("Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print("waiting...")
channel.start_consuming()

测试

发送5条消息
(test) D:\python测试\test>python send.py hello.
(test) D:\python测试\test>python send.py hello..
(test) D:\python测试\test>python send.py hello...
(test) D:\python测试\test>python send.py hello....
(test) D:\python测试\test>python send.py hello.....
界面显示 Ready:5, Unacked:0, Total:5

消费方1消费
(test) D:\python测试\test>python receive-1.py
界面显示 Ready:4, Unacked:1, Total:5
因为没有应答,表示没有处理完

消费方1断开连接
界面显示 Ready:5, Unacked:0, Total:5

消费方2消费
(test) D:\python测试\test>python receive-2.py
界面显示 Ready:0, Unacked:0, Total:0
因为有应答,所有都处理完成了

将receive-1.py中的手动应答打开,两个receive同时执行,则两个会按顺序分别处理没有消费的数据,负载均衡

本文主要学习了RabbitMQ的任务分发机制,介绍了队列持久性、消息持久性、消息确认以及预取消息等特性,并给出了相应的编码示例。

参考:https://www.rabbitmq.com/tutorials/tutorial-two-python.html

参考:https://dulishu.top/rabbitmq-work-queues/

未经允许不得转载:江哥架构师笔记 » rabbitmq学习(2):Work Queues

分享到:更多 ()

评论 抢沙发

评论前必须登录!