在熟悉了AMQP的相关概念之后,再学习RabbitMQ就比较容易了。毕竟RabbitMQ是对AMQP的实现,并且RabbitMQ提供了包括c/c++, python, java等众多语言可供开发者选择使用。
1.RabbitMQ
RabbitMQ是一个消息中间件,用来接收和转发消息。相关术语我们在前面的章节中已经介绍过了,比如:生产者、消费者、交换器、队列等等。
生产者用来生产消息
生产者生产的消息经过交换器路由之后,会进入相应的队列中去
队列说白了就是一个很大的消息缓冲区,它只受限于主机的内存和磁盘的约束。消费者从这些队列中去接收消息进行消费
基于上述三个实体,一个简单的生产者-消费者模型表示如下,这也正是RabbitMQ总体设计的理念:
因为AMQP是一种网络协议,所以生产者、消息代理(message broker, 主要负责消息的接收、转发)以及消费者可以分布在不同的主机上。
2.Hello World!
使用python来实现图1.4模型,P生产消息至队列hello中,再由C消费。需要说明的是,使用python语言进行RabbitMq客户端程序的编写需要导入pika模块。
(1)生产消息
1)首先,建立AMQP连接。因为AMQP是一种网络协议,所以使用RabbitMQ的第一步就是建立连接:
import pika import time credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel()
其中,
a)连接参数只指定了host,在前面介绍AMQP的章节中,我们知道建立一个AMQP连接还需要一些其他参数。如vhost参数这里使用了默认值’/’,port使用默认值5672端口,用户名和密码使用默认值’guest’等等。
b)同样地,在介绍AMQP时,我们知道建立连接之后,进行消息的生产之前,我们还必须要打开一个通道。
2)接着,声明一个队列:
channel.queue_declare(queue='hello')
其中,指定了队列的名字为hello。当然,你也可以不去声明这个队列,让消费者去声明。但是这样你将面临消息被丢弃的风险(如果消费者应用程序比生产者应用程序先执行,可以规避这个风险)。
3)然后,生产消息,这里生产了5w条消息:
for i in range(0, 500000): channel.basic_publish(exchange='', routing_key='hello', body='Hello World! {}'.format(i)) # time.sleep(1) print("Send 'hello world'") connection.close()
其中,
a)指定默然交换器exchange=””。在对AMQP进行学习时,我们知道:生产者生产的消息并不是直接生产至队列中去的,而是经过交换器,再由交换器路由至相应的队列中。默认交换器的特性可参考前一篇文章。
b)指定消息的路由关键字为”hello”。介于默认交换器的特性,此处指定的路由关键字”hello”其实就是待路由队列的名字。
c)生产的消息内容为”Hello World!”。
4)最后,关闭连接:
connection.close()
(2)消费消息
1)首先,建立连接。跟消费者一样,生产者也需要向消息代理发起AMQP连接。
2)接着,声明队列:
channel.queue_declare(queue='hello')
3)然后,进行消费:
def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
其中,
a)定义回调函数callback(),指定消费消息时的消费行为。
b)进行消费。指定回调函数、队列名等。
c)此处未关闭连接,消费者将一直等待消息。
(3)运行
生产者源码 send.py:
import pika import time credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello') for i in range(0, 500000): channel.basic_publish(exchange='', routing_key='hello', body='Hello World! {}'.format(i)) # time.sleep(1) print("Send 'hello world'") connection.close()
消费者源码 receive.py:
import pika credentials = pika.PlainCredentials('admin','admin-pass') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
先执行消费者应用程序,再执行生产者应用程序:
python receive.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Hello World!' python send.py # => [x] Sent 'Hello World!'
3.其他
本文主要介绍了RabbitMQ的基本使用方法:建立连接、声明队列、生产/消费消息以及关闭连接内容。
参考:https://www.rabbitmq.com/getstarted.html
参考:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
参考:https://dulishu.top/category/rabbitmq/
评论前必须登录!
注册