与fanout类型的交换器绑定了的队列,会收到所有经过该交换器的消息,即fanout类型的交换器会转发所有消息到与之绑定的队列中。那么,如果消费者应用程序只想接收经过交换器的一部分消息,该怎么办呢?
1.再谈绑定
前面的章节中,我们已经介绍过了绑定的概念。聪明的你一定能总结出以下绑定相关的性质:
(1)默认交换器,不需要显示地将其与队列进行绑定。因为,所有队列都会自动地用其队列名与交换器进行绑定,并且默认交换器是direct类型;
(2)fanout类型的交换器,与队列进行绑定时,无需指定路由关键字(routing_key)。因为fanout类型的交换器会广播消息至所有绑定的队列。
指定路由关键字routing_key进行绑定的代码形式如下所示:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
它表明,经过交换器的消息将根据routing_key的值进行匹配、转发给特定的队列。
2.direct类型交换器
前一篇文章实现的日志记录系统会将产生的所有日志消息,分别输入到本地文件和标准输出中。现在,假设这种场景:日志记录系统只会将危险级别较高(比如error级别)的日志消息写入本地并打印输出,而其他级别(比如warning, info)的日志消息只打印输出即可。
依据假设的场景,我们需要声明若干队列,其中一个队列用来记录error级别的消息,并输出至本地文件中;其他队列用来记录error, warning和info级别的消息,打印输出。
针对上述场景,fanout类型的交换器就显得无能为力了。但是,direct类型的交换器可以完美解决这个问题,因为该类型的交换器会将消息的路由关键字与队列绑定时的路由关键字进行匹配,然后再将消息转发给匹配上的队列。direct类型交换器,
其中,队列Q1通过关键字orange与交换器X绑定;队列Q2分别通过关键字black,green与交换器X绑定。这样一来,消息路由关键字为orange的消息会被发送到Q1中;消息路由关键字为black或green的消息会被发送到Q2中;其他路由关键字的消息则会被丢弃。
需要提及的一点是:多个队列可以使用同一个路由关键字与交换器进行绑定,
3.示例
基于上述假设的场景,实现一个升级版的日志记录系统:对error级别的日志消息记录到本地并打印输出,其他级别的消息打印输出即可
(1)生产者emit_log_direct.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
(2)消费者receive_logs_direct.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
参考:
评论前必须登录!
注册