路漫漫其修远兮
吾将上下而求索

python学习:rabbitmq长连接超时重连

下面程序的思路,当和rabbitmq建立连接后,同时启动一个线程来检测这个链接,当这个线程检测到链接中断后,重新建立一个链接,从而达到自动重连的机制

其他程序超时重连也可以参考这个原理进行处理

#!/usr/bin/env python
#coding=utf-8
# pip install -I -U service_identity
# pip install pika==0.10.0
#import traceback
import sys
import logging
import time
import threading
import pika
'''
如果用了basicConfig那么所有模块的日志都会打印
logging.basicConfig(
    level = logging.INFO,
    #format = "%(asctime)s [%(levelname)-8s] %(message)s"
    format = "%(asctime)s [%(levelname)s] %(message)s"
)
'''

logger = logging.getLogger(__name__)
'''
logger.setLevel(level = logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(name)s] [%(levelname)s] %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
'''
        
class RabbitMQClient(object):

    def __init__(self, sv_addr, sv_port, username, passwd):

        self.credentials = pika.PlainCredentials(username, passwd)
        self.sv_addr = sv_addr
        self.sv_port = int(sv_port)
        self.consume_info = {}
        self.consume_state = False

    def connect(self):
        try:
            self.conn = pika.BlockingConnection(
                        pika.ConnectionParameters(self.sv_addr, self.sv_port,
                        '/', self.credentials,heartbeat_interval=0))
            self.ch = self.conn.channel()
            t = threading.Thread(target=self.connect_check)
            t.start()
        except:
            '''
            这种方法比用logger的exc_info=True,多了exception类所在的代码路径
            ex_type, ex_val, ex_stack = sys.exc_info()
            logger.error("ex_type: %s" %ex_type)
            logger.error("ex_val: %s" %ex_val)
            for ex_stack_info in traceback.extract_tb(ex_stack):
                logger.error("stack_info: %s" %str(ex_stack_info))
            '''
            logger.error("connect rabbitmq failed:%s:%s, stop starting" %(self.sv_addr, self.sv_port))
            logger.error("Exception:", exc_info=True)
            while True:
                time.sleep(1)
                logger.error("connect rabbitmq failed:%s:%s, stop starting" %(self.sv_addr, self.sv_port))

    def reconnect(self):
        try:
            logger.error("reconnect rabbitmq: %s:%s" %(self.sv_addr, self.sv_port))
            self.conn = pika.BlockingConnection(
                        pika.ConnectionParameters(self.sv_addr, 
                        self.sv_port, '/', self.credentials))
            self.ch = self.conn.channel()
        except:
            logger.error("reconect failed, after 1s will reconnect")
            #logger.error("Exception:", exc_info=True)

    def connect_check(self):
        while True:
            try:
                time.sleep(5)
                logger.debug("connection state: %d" %int(self.conn.is_open))
                if not self.conn.is_open:
                    self.reconnect()
            except:
                logger.error("connect_check error!!!.")
                #logger.error("Exception:", exc_info=True)
                
    def publish(self, queue_name, queue_body):
        try:
            self.ch.basic_publish(exchange="", routing_key=queue_name, body=queue_body)
            logger.info("publish:%s" %queue_body)

        except:
            logger.error("publish error:")
            logger.error("Exception:", exc_info=True)

    def consume(self, queue_name, callback):
        try:
            if queue_name not in self.consume_info:
                self.consume_info[queue_name] = callback
        except:
            logger.error("Exception:", exc_info=True)

    def restart_consuming(self):
        for queue_name in self.consume_info:
            self.ch.queue_declare(queue=queue_name)
            self.ch.basic_consume(self.consume_info[queue_name], queue=queue_name, no_ack=True)
        logger.info("consume_info is %s" %str(self.consume_info))
        self.ch.connection.process_data_events(time_limit=1)
        self.ch.start_consuming()

    def start_consuming(self, noargs):
        while True:
            try:
                self.restart_consuming()
                time.sleep(1)
            except:
                logger.error("consuming failed, after 1s start_consuming again!")
                logger.error("Exception:", exc_info=True)
                time.sleep(1)

    def replace(self, old_queue, new_queue, callback):
        """
        @summary: 会停止所有consumer,重新订阅consume_info列表
        @param old_queue: str类型
        @param new_queue: str类型
        """
        try:
            del self.consume_info[old_queue]
            self.consume_info[new_queue] = callback
            self.ch.stop_consuming()
            for queue_name in self.consume_info:
                self.ch.queue_declare(queue=queue_name)
                self.ch.basic_consume(self.consume_info[queue_name], queue=queue_name, no_ack=True)
        except:
            logger.error("Exception:", exc_info=True)
        
        
if __name__ == "__main__":
    '''
    测试用例
    '''
    def callback(ch, method, properties, body):
        try:
            logger.info("Received: %r" %body)
        except:
            logger.error("callback failed")
            logger.error("Exception:", exc_info=True)
    rbclient = RabbitMQClient("192.168.1.87", 5673, "mq", "123456")
    rbclient.connect()
    rbclient_1 = RabbitMQClient("192.168.1.87", 5673, "mq", "123456")
    rbclient_1.connect()
    rbclient.consume("hello", callback)
    t = threading.Thread(target = rbclient.start_consuming, args=("noargs"))
    t.start()
    t1 = time.time()
    rbclient_1.publish("hello", "hello world!")
    rbclient_1.publish("hello", "hello world!")
    rbclient_1.publish("hello", "hello world!")
    time.sleep(10)
    print("--------relace------")
    rbclient.replace("hello", "hello1", callback)
    print("--------relace end------")
    while True:
        print("while publish")
        rbclient_1.publish("hello1", "hello1 world1!")
        rbclient_1.publish("hello", "hello world!")
        time.sleep(1)

未经允许不得转载:江哥架构师笔记 » python学习:rabbitmq长连接超时重连

分享到:更多 ()

评论 抢沙发

评论前必须登录!