路漫漫其修远兮
吾将上下而求索

graylog配置:zookeeper,kafka,filebeat安装

Zookeeper集群

配置10.16.15.192/10.16.15.193/10.16.15.194的zookeeper集群

1.安装Zookeeper
zookeeper官网:  http://zookeeper.apache.org/
这里要注意,请下载XXX版,无需安装解压可用,千万不要下载错了。
# zookeeper依赖于java,前面已经安装了,这里就不在强调了
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.9.tar.gz
# 解压到/usr/local目录下
tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local
ln -s /usr/local/zookeeper-3.4.9 /usr/local/zookeeper
# 编写配置文件
vim /usr/local/zookeeper-3.4.9/conf/zoo.cfg
grep -Ev '(^$|#)' /usr/local/zookeeper-3.4.9/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
server.1=zk001:2888:2777
server.2=zk002:2888:2777
server.3=zk003:2888:2777

同步配置文件到其他两台节点
zookeeper集群,每个节点的配置都是一样的,不需要做任何更改,不熟悉的zookeeper的小伙伴,可以参考:
scp zoo.cfg 10.16.15.193:/usr/local/zookeeper-3.4.9/conf/zoo.cfg
scp zoo.cfg 10.16.15.194:/usr/local/zookeeper-3.4.9/conf/zoo.cfg

2.创建myid
# 10.16.15.192
echo 1 >/usr/local/zookeeper-3.4.9/data/myid
# 10.16.15.193
echo 2 >/usr/local/zookeeper-3.4.9/data/myid
# 10.16.15.194
echo 3 >/usr/local/zookeeper-3.4.9/data/myid

3.启动服务 & 查看状态
# 10.16.15.192
bin/zkServer.sh start
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: leader
# 10.16.15.193
bin/zkServer.sh start
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower
# 10.16.15.194
bin/zkServer.sh start
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: follower

Kafka集群

配置kafka broker集群

Kafka官网:  http://kafka.apache.org/

1.安装Kfaka
# 下载免安装版本,Binary 版本,记得自己看清楚
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
tar -zxvf kafka_2.12-0.10.2.1.tgz -C /usr/local
ln -s /usr/local/kafka_2.12-0.10.2.1 /usr/local/kafka

2.修改配置文件
# 10.16.15.192 节点
[root@zk001 config]# grep -Ev '(^$|#)' server.properties
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://10.16.15.192:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/data/kafka-logs
num.partitions=20
num.recovery.threads.per.data.dir=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.16.15.192:2181,10.16.15.193:2181,10.16.15.194:2181
zookeeper.connection.timeout.ms=6000
同步配置文件到10.16.15.193/10.16.15.194,内容基本相同,只需要修改一下broker.id和listeners

# 同步配置文件
scp server.properties 10.16.15.193:/usr/local/kafka/kafka_2.11-0.10.0.1/config/
scp server.properties 10.16.15.194:/usr/local/kafka/kafka_2.11-0.10.0.1/config/
# 修改broker.id和listeners
# 10.16.15.193
broker.id=2
listeners=PLAINTEXT://10.16.15.193:9092
# 10.16.15.194
broker.id=3
listeners=PLAINTEXT://10.16.15.194:9092

3.配置对应IP解析
# 10.16.15.192
vim /etc/hosts
10.16.15.192    zk001
10.16.15.193    zk002
10.16.15.194    zk003
# 其他两台的的hosts配置也是一样的,记得同步

4.启动kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 
# 其他两个节点的服务启动方式是一样的
到此,kakfa+zookeeper集群搭建完成。

友情赠送:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  # 创建topic
bin/kafka-topics.sh --list --zookeeper localhost:2181   # 查看已经创建的topic列表
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  # 查看topic的详细信息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 发送消息, 回车后模拟输入一下消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # 消费消息, 可以换到其他kafka节点, 同步接收生产节点发送的消息
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 6  # 给topic增加分区
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1  # 删除已经创建的topic, 前提是开了delete.topic.enable=true参数
如果还不能删除, 可以到zookeeper中去干掉它
cd /usr/local/zookeeper-3.4.9/
bin/zkCli.sh
ls /brokers/topics            # 查看topic
rmr /brokers/topics/test1     # 删除topic

filebeat安装

当前的情况,公司有多个中心,每个中心有多个部门,每个部门有多个业务线,每个业务使用的语言有java和php,每个服务器上的日志格式不同

这里日志分为两大类:一类是java调用开源的sdk(gelfLog),并且一个业务的日志字段固定,输出json数据到日志文件中,让filebeat读取。

一类是不好协调研发,比如php,和一些java 还是按照以前的方式打印日志输出到日志文件中,让filebeat读取。

安装filebeat

https://www.elastic.co/downloads

选择filebeat可以使用rpm,也可以使用tar包,这里使用tar包进行部署,使用版本:filebeat-6.4.2-linux-x86_64

https://www.elastic.co/downloads/beats/filebeat

配置说明

使用下面的配置文件进行测试

[root@graylog_d_47_133 applogs]# vim /usr/local/filebeat/filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /applogs/*.log
  fields:
    log_topic: test
    env: test
  fields_under_root: true
  json:
    keys_under_root: true
    add_error_key: true
  ignore_older: 10m


processors:


output.kafka:
  enabled: true
  hosts: ["kafka.graylog.ziroom.com:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: true
  max_message_bytes: 1000000
  required_acks: 1


logging:
  metrics.period: 60s
  to_files: true

日志文件输入以下日志

{"host":"host1","short_message":"this is a test","full_message":"this is full_message","version": "1.1","facility":"www.abc.com"}

kafka输出内容

{"@timestamp":"2018-10-26T10:17:44.834Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"source":"/applogs/a.log","offset":130,"log_topic":"test","host":{"name":"graylog_d_47_133"},"short_message":"this is a test","facility":"www.abc.com","env":"test","beat":{"hostname":"graylog_d_47_133","version":"6.4.2","name":"graylog_d_47_133"},"full_message":"this is full_message","version":"1.1"}

除了自己的字段外,filebeat还会添加自己的一些字段

kafka中的数据输入到graylog中时,有些字段是必须的,具体查看下面的配置

http://docs.graylog.org/en/2.4/pages/gelf.html#gelf-payload-specification

这里修改配置,转换一些字段,丢弃不需要的一些字段

filebeat.prospectors:
- input_type: log
  paths:
    - /applogs/*.log
  fields:
    log_topic: test 
    env: test 
  fields_under_root: true
  json:
    keys_under_root: true
    add_error_key: true
  ignore_older: 10m


processors:
- rename:
    fields:
     - from: "host.name"
       to: "hostname"
    ignore_missing: false
    fail_on_error: true

- drop_fields: # discard unrequired fields normally injected by filebeats
    fields: ["host", "beat", "offset"]

- rename:
    fields:
     - from: "hostname"
       to: "host"
    ignore_missing: false
    fail_on_error: true


output.kafka:
  enabled: true
  hosts: ["kafka.graylog.ziroom.com:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: true
  max_message_bytes: 1000000
  required_acks: 1


logging:
  metrics.period: 60s
  to_files: true

kafka输出内容

{"@timestamp":"2018-10-26T10:53:47.327Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"full_message":"this is full_message","version":"1.1","source":"/applogs/a.log","env":"test","log_topic":"test","host":"graylog_d_47_133","short_message":"this is a test","facility":"www.abc.com"}

当日志为平常的php日志的时候,即日志内容不是json格式

下面是刚开始的配置内容

filebeat.prospectors:
- input_type: log
  paths:
    - /applogs/*.log
  fields:
    log_topic: test 
    version : "1.1"
    short_message : from_filebeat
    facility: www.abc.com
    env: test 
  fields_under_root: true
  multiline.pattern: '^\[\d{2}\-'
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 5s
  ignore_older: 10m


output.kafka:
  enabled: true
  hosts: ["kafka.graylog.ziroom.com:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: true
  max_message_bytes: 1000000
  required_acks: 1


logging:
  metrics.period: 60s
  to_files: true

写入示例日志到日志文件中

echo "10.0.0.1 – time:10|this is a test" >> /applogs/a.log

kafka输出内容

{"@timestamp":"2018-10-26T15:59:47.639Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"offset":0,"short_message":"from_filebeat","env":"test","beat":{"name":"graylog_d_47_133","hostname":"graylog_d_47_133","version":"6.4.2"},"host":{"name":"graylog_d_47_133"},"source":"/applogs/a.log","message":"10.0.0.1 – time:10|this is a test","facility":"www.abc.com","log_topic":"test","version":"1.1"}

和上面一样,我们需要将不必要的字段去除掉

下面是修改后的配置文件

filebeat.prospectors:
- input_type: log
  paths:
    - /applogs/*.log
  fields:
    log_topic: test 
    version : "1.1"
    short_message : from_filebeat
    facility: www.abc.com
    env: test 
  fields_under_root: true
  multiline.pattern: '^\[\d{2}\-'
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 5s
  ignore_older: 10m


processors:
- rename:
    fields:
     - from: "message"
       to: "full_message"
     - from: "host.name"
       to: "hostname"
    ignore_missing: false
    fail_on_error: true
        
- drop_fields: # discard unrequired fields normally injected by filebeats
    fields: ["host", "beat", "source", "offset"]
        
- rename:
    fields:
     - from: "hostname"
       to: "host"
    ignore_missing: false
    fail_on_error: true


output.kafka:
  enabled: true
  hosts: ["kafka.graylog.ziroom.com:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: true
  max_message_bytes: 1000000
  required_acks: 1


logging:
  metrics.period: 60s
  to_files: true

写入示例日志到日志文件中

echo "10.0.0.1 – time:10|this is a test" >> /applogs/a.log

kafka输出内容

{"@timestamp":"2018-10-26T16:04:45.066Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"log_topic":"test","facility":"www.abc.com","full_message":"10.0.0.1 – time:11|this is a test","host":"graylog_d_47_133","env":"test","version":"1.1","short_message":"from_filebeat"}

其他配置说明

1、输入的内容会原模原样的输出来,不管是引号,换行符,还是json格式文档

echo 'this is a test "aaabbbccc"\nhello\thi\nhello' >> test.log

从kafka中输出的内容中,原有的一行数据都是一个字符串,保持原样作为一个字段,保存在message中

{"@timestamp":"2018-10-12T03:02:07.785Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"this is a test \"aaabbbccc\"\\nhello\\thi\\nhello","type":"log"}

2、换行符说明

下面的换行符会默认被识别为三行日志,而不是一个整体

echo 'this is a test "aaabbbccc"

hello hi

 hello' >> test.log

输出

{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"this is a test \"aaabbbccc\"","type":"log"}

{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"hellohi","type":"log"}

{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":" hello","type":"log"}

3、多行匹配

php多行日志示例,后面配置中的正则表示以哪个被正则匹配到的字符作为新的起始行,也就是说在程序里面要固定,以“[11-Oct-2018 21:30:19]”作为每行的开始,只要不以这个形式开始,就被认为是上一行的内容,还没有结束。  

echo "

[11-Oct-2018 21:30:19]  [pool www] pid 16070

script_filename = /home/www/helios/inside/web/index.php

[0x00007ffeb4ef9908] execute() /home/www/helios/vendor/yiisoft/yii2/db/Command.php:900

[0x00007ffeb4ef8f38] queryInternal() /home/www/helios/vendor/web/php-yii2-db/src/Command.php:48

" >> test.log

可通过下面的配置来进行区分多行

- input_type: log
  paths:
    - /applogs/*.log
  fields:
    logIndex: nginx
    docType: nginx-access
    log_topic: applogs
  multiline.pattern: '^\[\d{2}\-'
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 5s

java多行示例

echo "

[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}

MapperParsingException[Field name [events.created] cannot contain '.']

    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)

" >> test.log

配置

- input_type: log
  paths:
    - /tmp/test.log
  multiline:
    pattern: '^\['
    negate: true
    match: after
	timeout: 5s

4、当服务部署到新机器后,新机器对应目录里面有日志文件,日志文件修改时间离现在超过10m的,将被忽略。如果没有这个参数,则将目录下面的所有文件都传过去,不管时间是什么时候

ignore_older: 10m

5、类似logstash,为了防止重复处理日志,filebeat也会记录处理进度到文件data/registry

为了测试可以先停止filebeat,清空文件registry,然后再启动就会重复处理了 

6、如果日志的格式直接是json格式,增加下面的配置字段,会直接将此json传过去,并在这个json中增加自定义的字段

- input_type: log
  paths:
    - /applogs/*.log
  json:
    keys_under_root: true
    add_error_key: true

7、自定义变量说明,当有多个日志目录,每个日志目录属于不同的业务,想把不同目录的日志内容发送到kafka不同的topic,可参看下面的方式

filebeat.prospectors:
- input_type: log
  paths:
    - /applogs/*.log
  fields:
    log_topic: test 
    version : "1.1"
    short_message : from_filebeat
    facility: www.abc.com
    env: test 
  fields_under_root: true
- input_type: log
  paths:
    - /aaa/*.log
  fields:
    log_topic: test1
    version : "1.1"
    short_message : from_filebeat
    facility: www.bcd.com
    env: test 
  fields_under_root: true

output.kafka:
  enabled: true
  hosts: ["kafka.graylog.ziroom.com:9092"]
  topic: '%{[log_topic]}'
  partition.round_robin:
    reachable_only: true
  max_message_bytes: 1000000
  required_acks: 1

8、已删除文件,filebeat不释放,添加下面配置。在配置文件中添加close_timeout: 20m,保证每隔20分钟file handler被关闭,不管是否遇到EOF符号。

- type: log
   paths:
   ...
   force_close_files: true
   close_timeout: 20m

安装filebeat

使用配置,建议查看

https://www.elastic.co/guide/en/beats/filebeat/current/index.html

https://cloud.tencent.com/developer/article/1006051

里面的配置供参考

https://blog.yuzunzhi.com/filebeat%E9%85%8D%E7%BD%AE%E8%AF%A6%E8%A7%A3/

官方的选项介绍

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

kafka和zookeeper参考

https://my.oschina.net/xuesong0204/blog/919760

http://blog.51cto.com/tchuairen/1855090

http://blog.51cto.com/tchuairen/1861167

未经允许不得转载:江哥架构师笔记 » graylog配置:zookeeper,kafka,filebeat安装

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址