相比fanout类型,基于direct类型的交换器实现的日志记录系统,可以让我们有选择的接收日志。而在其他的一些场景下,direct类型交换器仍然不能够满足人们日益增长的需求。比如:不仅需要对日志的级别进行过滤,还要对日志源进行过滤。
1.syslog消息格式
syslog在记录日志的时候,有着严格的日志格式(类似于Facility.Level)。比如,user.info表示user产生的info级别的消息;kern.error表示kernel产生的error级别的消息
所以,如果想要实现一个不仅可以过滤日志级别,还可以过滤日志源的进阶日志记录系统,direct类型的交换器就显得无助了。topic类型的交换器可以满足这一需求。
2.topic类型交换器
该类型的交换器与direct类型的交换器的相同点是,它们都是根据routing_key来进行匹配过滤消息的。(生产者生产消息时指定的routing_key,以及消费者绑定队列与交换器时使用的routing_key)
二者主要的不同点在于:
(1)direct交换器的routing_key是一个任意单词,比如error或info等;而topic类型交换器的routing_key是多个单词,单词之间用点(.)分隔,比如user.info或kern.error等。
(2)对于消费者绑定时指定的routing_key,AMQP提供了两个特殊的符号:
1)*:可以用来替代任意一个单词;
2)#:可以用来替代零个或多个单词;
有了这两个特殊的符号,给topic类型的交换器在进行规制匹配时增加了很大的灵活性,用图2-1来直观感受一下:topic2-1
若消息的routing_key是“quick.orange.rabbit”,那么这条消息将被发送至Q1和Q2;若消息的routing_key是“quick.orange.fox”,那么这条消息将被发送至Q1;若消息的routing_key是“lazy.brown.fox”,那么这条消息将被发送至Q2;若消息的routing_key是“quick.brown.fox”,该条消息将被丢弃。
3.示例
了解这些之后,使用topic类型的交换器就可以实现一个用来过滤类似于syslog日志格式的日志记录系统了。
$cat emit_log_topic.py #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() $cat receive_logs_topic.py #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
测试
To receive all the logs run: python receive_logs_topic.py "#" To receive all logs from the facility "kern": python receive_logs_topic.py "kern.*" Or if you want to hear only about "critical" logs: python receive_logs_topic.py "*.critical" You can create multiple bindings: python receive_logs_topic.py "kern.*" "*.critical" And to emit a log with a routing key "kern.critical" type: python emit_log_topic.py "kern.critical" "A critical kernel error"
参考:https://dulishu.top/rabbitmq-topic/
参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html
评论前必须登录!
注册