路漫漫其修远兮
吾将上下而求索

rabbitmq学习(1):Hello World

在熟悉了AMQP的相关概念之后,再学习RabbitMQ就比较容易了。毕竟RabbitMQ是对AMQP的实现,并且RabbitMQ提供了包括c/c++, python, java等众多语言可供开发者选择使用。

1.RabbitMQ

RabbitMQ是一个消息中间件,用来接收和转发消息。相关术语我们在前面的章节中已经介绍过了,比如:生产者、消费者、交换器、队列等等。

生产者用来生产消息

1.png

生产者生产的消息经过交换器路由之后,会进入相应的队列中去

2.png

队列说白了就是一个很大的消息缓冲区,它只受限于主机的内存和磁盘的约束。消费者从这些队列中去接收消息进行消费

3.png

基于上述三个实体,一个简单的生产者-消费者模型表示如下,这也正是RabbitMQ总体设计的理念:

4.png

因为AMQP是一种网络协议,所以生产者、消息代理(message broker, 主要负责消息的接收、转发)以及消费者可以分布在不同的主机上。

2.Hello World!

使用python来实现图1.4模型,P生产消息至队列hello中,再由C消费。需要说明的是,使用python语言进行RabbitMq客户端程序的编写需要导入pika模块。

(1)生产消息

1)首先,建立AMQP连接。因为AMQP是一种网络协议,所以使用RabbitMQ的第一步就是建立连接:

import pika
import time
credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

其中,

a)连接参数只指定了host,在前面介绍AMQP的章节中,我们知道建立一个AMQP连接还需要一些其他参数。如vhost参数这里使用了默认值’/’,port使用默认值5672端口,用户名和密码使用默认值’guest’等等。

b)同样地,在介绍AMQP时,我们知道建立连接之后,进行消息的生产之前,我们还必须要打开一个通道。

2)接着,声明一个队列:

channel.queue_declare(queue='hello')

其中,指定了队列的名字为hello。当然,你也可以不去声明这个队列,让消费者去声明。但是这样你将面临消息被丢弃的风险(如果消费者应用程序比生产者应用程序先执行,可以规避这个风险)。

3)然后,生产消息,这里生产了5w条消息:

for i in range(0, 500000):
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World! {}'.format(i))
    # time.sleep(1)
print("Send 'hello world'")
connection.close()

其中,

a)指定默然交换器exchange=””。在对AMQP进行学习时,我们知道:生产者生产的消息并不是直接生产至队列中去的,而是经过交换器,再由交换器路由至相应的队列中。默认交换器的特性可参考前一篇文章。

b)指定消息的路由关键字为”hello”。介于默认交换器的特性,此处指定的路由关键字”hello”其实就是待路由队列的名字。

c)生产的消息内容为”Hello World!”。

4)最后,关闭连接:

connection.close()

(2)消费消息

1)首先,建立连接。跟消费者一样,生产者也需要向消息代理发起AMQP连接。

2)接着,声明队列:

channel.queue_declare(queue='hello')

3)然后,进行消费:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

其中,

a)定义回调函数callback(),指定消费消息时的消费行为。

b)进行消费。指定回调函数、队列名等。

c)此处未关闭连接,消费者将一直等待消息。

(3)运行

生产者源码 send.py:

import pika
import time
credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='hello')
for i in range(0, 500000):
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World! {}'.format(i))
    # time.sleep(1)
print("Send 'hello world'")
connection.close()

消费者源码 receive.py:

import pika

credentials = pika.PlainCredentials('admin','admin-pass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.170.21', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

先执行消费者应用程序,再执行生产者应用程序:

python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Hello World!'

python send.py
# => [x] Sent 'Hello World!'

3.其他

本文主要介绍了RabbitMQ的基本使用方法:建立连接、声明队列、生产/消费消息以及关闭连接内容。

参考:https://www.rabbitmq.com/getstarted.html

参考:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

参考:https://dulishu.top/category/rabbitmq/

未经允许不得转载:江哥架构师笔记 » rabbitmq学习(1):Hello World

分享到:更多 ()

评论 抢沙发

评论前必须登录!