路漫漫其修远兮
吾将上下而求索

Kafka学习:topic、consumer group状态命令

最近工作中遇到需要使用kafka的场景,测试消费程序启动后,要莫名的过几十秒乃至几分钟才能成功获取到到topic的partition和offset,而后开始消费数据,于是学习了一下查看kafka broker里topic和consumer group状态的相关命令,这里记录一下。

命令参考自《Kafka: The Definitive Guide》 Chapter 9 Administrating Kafka

以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap–server(即broker)地址为: 127.0.0.1:9292

创建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看kafka topic列表,使用–list参数

bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
lx_test_topic
test

查看kafka特定topic的详情,使用–topic与–describe参数

列出了lx_test_topic的parition数量、replica因子以及每个partition的leader、replica信息

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe
Topic:lx_test_topic     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: lx_test_topic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

发送接收消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 发送消息, 回车后模拟输入一下消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # 消费消息, 可以换到其他kafka节点, 同步接收生产节点发送的消息

增加分片,只能增加,不能减少

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 6

删除topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1  # 删除已经创建的topic, 前提是开了delete.topic.enable=true参数
如果还不能删除, 可以到zookeeper中去干掉它
cd /usr/local/zookeeper-3.4.9/
bin/zkCli.sh
ls /brokers/topics            # 查看topic
rmr /brokers/topics/test1     # 删除topic

查看consumer group列表,使用–list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap–server和zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
lx_test

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
console-consumer-86845console-consumer-11967

查看特定consumer group 详情,使用–group与–describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group lx_test --describe
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
lx_test                        lx_test_topic             0          465             465             0               kafka-python-1.3.1_/127.0.0.1

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-11967 --describe
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.
console-consumer-11967         lx_test_topic             0          unknown         465             unknown         console-consumer-11967_aws-lx-1513787888172-d3a91f05-0

其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。

模拟手动消费,指定group和topic

bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group  IM_STATISTICS_GROUP_PROD  --topic im_dialog_topic_ZIROOM_RENT_IM --from-beginning

kafka管理工具Kafka Eagle安装:

https://blog.csdn.net/ukakasu/article/details/81386410

未经允许不得转载:江哥架构师笔记 » Kafka学习:topic、consumer group状态命令

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址