前一篇文章学习了工作队列,细心的你或许会发现:生产者生产的每一条特定的消息要么被消费者C1消费,要么被消费者C2消费。也就是说,一条消息只能被一个消费者消费。那么,多个消费者如何消费同一条消息呢?
1.fanout类型交换器
我们将“向多个消费者发送同一条消息”的模式称为“发布/订阅”模式。在RabbitMQ中实现这种模式,依赖于特定类型的交换器。我们知道,生产者生产的消息需要经过交换器,再由交换器路由到指定的队列中。交换器可以将某条消息发送给一个队列或多个队列,同样也可以将它丢弃,这些都依赖于交换器的类型。
对于fanout类型的交换器,它的实现机制类似于广播机制,会将每一条消息广播给与之绑定的每一个队列。
(1)声明fanout类型交换器
channel.exchange_declare(exchange='logs', exchange_type='fanout')
在前面的两个示例中,并没有显示的声明一个交换器,而是使用默认的交换器。此处,我们显示地声明一个fanout类型的交换器logs。该类型的交换器将会把接收到的所有消息广播给所有与之绑定的队列。
(2)向交换器中发送消息
channel.basic_publish(exchange='logs', routing_key='', body=message)
介于fanout类型交换器的特性,此处发布消息并没有指定路由关键字routing_key。
至此,我们已经学会使用两种类型的交换器了——direct、fanout,前面两个示例使用的是默认交换器,默认交换器的类型是direct。
2.临时队列
前面两个示例中,分别声明了“hello”和“task_queue”队列。此处我们声明一个临时队列:不再显示指定队列的名字(而是随机生成以amq.为前缀的队列名),并且将队列属性设置成exclusive——该属性的队列会随着声明该队列的消费者应用程序关闭而关闭:
channel.queue_declare(queue='', exclusive=True)
临时队列有两点好处:
(1)保证每次连接建立之后的队列都是一个全新的队列(队列中没有残留的未处理消息);
(2)一旦消费者断开连接,临时队列则会被自动删除。避免了消费者离线之后,消息仍然在队列中爆炸式的增长浪费磁盘空间。
3.绑定
将队列与fanout类型交换器绑定,实现了“发布/订阅”模式。这样一来,每当交换器接收到消息之后,就会将消息“发布”给所有“订阅”它的队列中去。
临时队列与fanout类型交换器绑定:
result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name)
绑定操作需要交换器和队列的名字,交换器名字属性声明为了logs,而队列的名字是由消息代理随机生成,通过result.method.queue可以获得。
4.示例
介于临时队列的两个优点,实现一个简单的日志记录系统。该日志系统包括一个生产者用来产生日志;两个消费者,分别用来记录日志到本地和输出日志到屏幕。
(1)生产者emit_log.py
$cat send.py import pika import sys credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print("Sent %r" % message) connection.close()
(2)消费者
$cat receive-1.py receive-2.py也一样 import pika credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue="", exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print('Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print("%r" % body) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
测试
发送消息 (test) D:\python测试\test>python send.py 1 接收消息 (test) D:\python测试\test>python receive-1.py (test) D:\python测试\test>python receive-2.py 当连接断开后,对应的队列会被自动删除,不会积压过多的消息
下面将队列换成固定名字的,下面是一个消费者,另外一个消费者的队列名不同
换成固定名字的队列后,即使消费者断开连接,队列还是存在的,有新的消息还是会被发送到对应的队列中,可能会造成积压
import pika credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') channel.queue_declare(queue='task_queue1', durable=True) channel.queue_bind(exchange='logs', queue='task_queue1') print('Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print("%r" % body) channel.basic_consume(queue='task_queue1', on_message_callback=callback, auto_ack=True) channel.start_consuming()
参考:https://dulishu.top/rabbitmq-publishsubscribe/
参考:https://www.rabbitmq.com/tutorials/tutorial-three-python.html
评论前必须登录!
注册