路漫漫其修远兮
吾将上下而求索

rabbitmq学习(5):Topic

相比fanout类型,基于direct类型的交换器实现的日志记录系统,可以让我们有选择的接收日志。而在其他的一些场景下,direct类型交换器仍然不能够满足人们日益增长的需求。比如:不仅需要对日志的级别进行过滤,还要对日志源进行过滤。

1.syslog消息格式

syslog在记录日志的时候,有着严格的日志格式(类似于Facility.Level)。比如,user.info表示user产生的info级别的消息;kern.error表示kernel产生的error级别的消息

所以,如果想要实现一个不仅可以过滤日志级别,还可以过滤日志源的进阶日志记录系统,direct类型的交换器就显得无助了。topic类型的交换器可以满足这一需求。

2.topic类型交换器

该类型的交换器与direct类型的交换器的相同点是,它们都是根据routing_key来进行匹配过滤消息的。(生产者生产消息时指定的routing_key,以及消费者绑定队列与交换器时使用的routing_key)

二者主要的不同点在于:

(1)direct交换器的routing_key是一个任意单词,比如error或info等;而topic类型交换器的routing_key是多个单词,单词之间用点(.)分隔,比如user.info或kern.error等。

(2)对于消费者绑定时指定的routing_key,AMQP提供了两个特殊的符号:

1)*:可以用来替代任意一个单词;

2)#:可以用来替代零个或多个单词;

有了这两个特殊的符号,给topic类型的交换器在进行规制匹配时增加了很大的灵活性,用图2-1来直观感受一下:topic2-1

5.png

若消息的routing_key是“quick.orange.rabbit”,那么这条消息将被发送至Q1和Q2;若消息的routing_key是“quick.orange.fox”,那么这条消息将被发送至Q1;若消息的routing_key是“lazy.brown.fox”,那么这条消息将被发送至Q2;若消息的routing_key是“quick.brown.fox”,该条消息将被丢弃。

3.示例

了解这些之后,使用topic类型的交换器就可以实现一个用来过滤类似于syslog日志格式的日志记录系统了。

$cat emit_log_topic.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()


$cat receive_logs_topic.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

测试

To receive all the logs run:
python receive_logs_topic.py "#"

To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"

You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"

参考:https://dulishu.top/rabbitmq-topic/

参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

未经允许不得转载:江哥架构师笔记 » rabbitmq学习(5):Topic

分享到:更多 ()

评论 抢沙发

评论前必须登录!