Zookeeper集群
配置10.16.15.192/10.16.15.193/10.16.15.194的zookeeper集群
1.安装Zookeeper zookeeper官网: http://zookeeper.apache.org/ 这里要注意,请下载XXX版,无需安装解压可用,千万不要下载错了。 # zookeeper依赖于java,前面已经安装了,这里就不在强调了 wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.9.tar.gz # 解压到/usr/local目录下 tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local ln -s /usr/local/zookeeper-3.4.9 /usr/local/zookeeper # 编写配置文件 vim /usr/local/zookeeper-3.4.9/conf/zoo.cfg grep -Ev '(^$|#)' /usr/local/zookeeper-3.4.9/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data dataLogDir=/usr/local/zookeeper/logs clientPort=2181 server.1=zk001:2888:2777 server.2=zk002:2888:2777 server.3=zk003:2888:2777 同步配置文件到其他两台节点 zookeeper集群,每个节点的配置都是一样的,不需要做任何更改,不熟悉的zookeeper的小伙伴,可以参考: scp zoo.cfg 10.16.15.193:/usr/local/zookeeper-3.4.9/conf/zoo.cfg scp zoo.cfg 10.16.15.194:/usr/local/zookeeper-3.4.9/conf/zoo.cfg 2.创建myid # 10.16.15.192 echo 1 >/usr/local/zookeeper-3.4.9/data/myid # 10.16.15.193 echo 2 >/usr/local/zookeeper-3.4.9/data/myid # 10.16.15.194 echo 3 >/usr/local/zookeeper-3.4.9/data/myid 3.启动服务 & 查看状态 # 10.16.15.192 bin/zkServer.sh start bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg Mode: leader # 10.16.15.193 bin/zkServer.sh start bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg Mode: follower # 10.16.15.194 bin/zkServer.sh start bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg Mode: follower
Kafka集群
配置kafka broker集群
Kafka官网: http://kafka.apache.org/
1.安装Kfaka # 下载免安装版本,Binary 版本,记得自己看清楚 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz tar -zxvf kafka_2.12-0.10.2.1.tgz -C /usr/local ln -s /usr/local/kafka_2.12-0.10.2.1 /usr/local/kafka 2.修改配置文件 # 10.16.15.192 节点 [root@zk001 config]# grep -Ev '(^$|#)' server.properties broker.id=1 delete.topic.enable=true listeners=PLAINTEXT://10.16.15.192:9092 num.network.threads=8 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/data/kafka-logs num.partitions=20 num.recovery.threads.per.data.dir=1 log.retention.hours=72 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.16.15.192:2181,10.16.15.193:2181,10.16.15.194:2181 zookeeper.connection.timeout.ms=6000 同步配置文件到10.16.15.193/10.16.15.194,内容基本相同,只需要修改一下broker.id和listeners # 同步配置文件 scp server.properties 10.16.15.193:/usr/local/kafka/kafka_2.11-0.10.0.1/config/ scp server.properties 10.16.15.194:/usr/local/kafka/kafka_2.11-0.10.0.1/config/ # 修改broker.id和listeners # 10.16.15.193 broker.id=2 listeners=PLAINTEXT://10.16.15.193:9092 # 10.16.15.194 broker.id=3 listeners=PLAINTEXT://10.16.15.194:9092 3.配置对应IP解析 # 10.16.15.192 vim /etc/hosts 10.16.15.192 zk001 10.16.15.193 zk002 10.16.15.194 zk003 # 其他两台的的hosts配置也是一样的,记得同步 4、jvm参数修改 vim kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" fi 5.启动kafka cd /usr/local/kafka/bin ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 其他两个节点的服务启动方式是一样的 到此,kakfa+zookeeper集群搭建完成。
–
filebeat安装
当前的情况,公司有多个中心,每个中心有多个部门,每个部门有多个业务线,每个业务使用的语言有java和php,每个服务器上的日志格式不同
这里日志分为两大类:一类是java调用开源的sdk(gelfLog),并且一个业务的日志字段固定,输出json数据到日志文件中,让filebeat读取。
一类是不好协调研发,比如php,和一些java 还是按照以前的方式打印日志输出到日志文件中,让filebeat读取。
安装filebeat
https://www.elastic.co/downloads
选择filebeat可以使用rpm,也可以使用tar包,这里使用tar包进行部署,使用版本:filebeat-6.4.2-linux-x86_64
https://www.elastic.co/downloads/beats/filebeat
配置说明
使用下面的配置文件进行测试
[root@graylog_d_47_133 applogs]# vim /usr/local/filebeat/filebeat.yml filebeat.prospectors: - input_type: log paths: - /applogs/*.log fields: log_topic: test env: test fields_under_root: true json: keys_under_root: true add_error_key: true ignore_older: 10m processors: output.kafka: enabled: true hosts: ["kafka.graylog.andblog.com:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: true max_message_bytes: 1000000 required_acks: 1 logging: metrics.period: 60s to_files: true
日志文件输入以下日志
{"host":"host1","short_message":"this is a test","full_message":"this is full_message","version": "1.1","facility":"www.abc.com"}
kafka输出内容
{"@timestamp":"2018-10-26T10:17:44.834Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"source":"/applogs/a.log","offset":130,"log_topic":"test","host":{"name":"graylog_d_47_133"},"short_message":"this is a test","facility":"www.abc.com","env":"test","beat":{"hostname":"graylog_d_47_133","version":"6.4.2","name":"graylog_d_47_133"},"full_message":"this is full_message","version":"1.1"}
除了自己的字段外,filebeat还会添加自己的一些字段
kafka中的数据输入到graylog中时,有些字段是必须的,具体查看下面的配置
http://docs.graylog.org/en/2.4/pages/gelf.html#gelf-payload-specification
这里修改配置,转换一些字段,丢弃不需要的一些字段
filebeat.prospectors: - input_type: log paths: - /applogs/*.log fields: log_topic: test env: test fields_under_root: true json: keys_under_root: true add_error_key: true ignore_older: 10m processors: - rename: fields: - from: "host.name" to: "hostname" ignore_missing: false fail_on_error: true - drop_fields: # discard unrequired fields normally injected by filebeats fields: ["host", "beat", "offset"] - rename: fields: - from: "hostname" to: "host" ignore_missing: false fail_on_error: true output.kafka: enabled: true hosts: ["kafka.graylog.andblog.com:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: true max_message_bytes: 1000000 required_acks: 1 logging: metrics.period: 60s to_files: true
kafka输出内容
{"@timestamp":"2018-10-26T10:53:47.327Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"full_message":"this is full_message","version":"1.1","source":"/applogs/a.log","env":"test","log_topic":"test","host":"graylog_d_47_133","short_message":"this is a test","facility":"www.abc.com"}
当日志为平常的php日志的时候,即日志内容不是json格式
下面是刚开始的配置内容
filebeat.prospectors: - input_type: log paths: - /applogs/*.log fields: log_topic: test version : "1.1" short_message : from_filebeat facility: www.abc.com env: test fields_under_root: true multiline.pattern: '^\[\d{2}\-' multiline.negate: true multiline.match: after multiline.timeout: 5s ignore_older: 10m output.kafka: enabled: true hosts: ["kafka.graylog.andblog.com:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: true max_message_bytes: 1000000 required_acks: 1 logging: metrics.period: 60s to_files: true
写入示例日志到日志文件中
echo "10.0.0.1 – time:10|this is a test" >> /applogs/a.log
kafka输出内容
{"@timestamp":"2018-10-26T15:59:47.639Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"offset":0,"short_message":"from_filebeat","env":"test","beat":{"name":"graylog_d_47_133","hostname":"graylog_d_47_133","version":"6.4.2"},"host":{"name":"graylog_d_47_133"},"source":"/applogs/a.log","message":"10.0.0.1 – time:10|this is a test","facility":"www.abc.com","log_topic":"test","version":"1.1"}
和上面一样,我们需要将不必要的字段去除掉
下面是修改后的配置文件
filebeat.prospectors: - input_type: log paths: - /applogs/*.log fields: log_topic: test version : "1.1" short_message : from_filebeat facility: www.abc.com env: test fields_under_root: true multiline.pattern: '^\[\d{2}\-' multiline.negate: true multiline.match: after multiline.timeout: 5s ignore_older: 10m processors: - rename: fields: - from: "message" to: "full_message" - from: "host.name" to: "hostname" ignore_missing: false fail_on_error: true - drop_fields: # discard unrequired fields normally injected by filebeats fields: ["host", "beat", "source", "offset"] - rename: fields: - from: "hostname" to: "host" ignore_missing: false fail_on_error: true output.kafka: enabled: true hosts: ["kafka.graylog.andblog.com:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: true max_message_bytes: 1000000 required_acks: 1 logging: metrics.period: 60s to_files: true
写入示例日志到日志文件中
echo "10.0.0.1 – time:10|this is a test" >> /applogs/a.log
kafka输出内容
{"@timestamp":"2018-10-26T16:04:45.066Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.2","topic":"test"},"log_topic":"test","facility":"www.abc.com","full_message":"10.0.0.1 – time:11|this is a test","host":"graylog_d_47_133","env":"test","version":"1.1","short_message":"from_filebeat"}
其他配置说明
1、输入的内容会原模原样的输出来,不管是引号,换行符,还是json格式文档
echo 'this is a test "aaabbbccc"\nhello\thi\nhello' >> test.log
从kafka中输出的内容中,原有的一行数据都是一个字符串,保持原样作为一个字段,保存在message中
{"@timestamp":"2018-10-12T03:02:07.785Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"this is a test \"aaabbbccc\"\\nhello\\thi\\nhello","type":"log"}
2、换行符说明
下面的换行符会默认被识别为三行日志,而不是一个整体
echo 'this is a test "aaabbbccc"
hello hi
hello' >> test.log
输出
{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"this is a test \"aaabbbccc\"","type":"log"}
{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":"hellohi","type":"log"}
{"@timestamp":"2018-10-12T03:21:49.123Z","fields":{"docType":"nginx-access","logIndex":"nginx","log_topic":"applogs"},"message":" hello","type":"log"}
3、多行匹配
php多行日志示例,后面配置中的正则表示以哪个被正则匹配到的字符作为新的起始行,也就是说在程序里面要固定,以“[11-Oct-2018 21:30:19]”作为每行的开始,只要不以这个形式开始,就被认为是上一行的内容,还没有结束。
echo "
[11-Oct-2018 21:30:19] [pool www] pid 16070
script_filename = /home/www/helios/inside/web/index.php
[0x00007ffeb4ef9908] execute() /home/www/helios/vendor/yiisoft/yii2/db/Command.php:900
[0x00007ffeb4ef8f38] queryInternal() /home/www/helios/vendor/web/php-yii2-db/src/Command.php:48
" >> test.log
可通过下面的配置来进行区分多行
- input_type: log paths: - /applogs/*.log fields: logIndex: nginx docType: nginx-access log_topic: applogs multiline.pattern: '^\[\d{2}\-' multiline.negate: true multiline.match: after multiline.timeout: 5s
java多行示例
echo "
[2016-05-25 12:39:04,744][DEBUG][action.bulk ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}
MapperParsingException[Field name [events.created] cannot contain '.']
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
" >> test.log
配置
- input_type: log paths: - /tmp/test.log multiline: pattern: '^\[' negate: true match: after timeout: 5s
4、当服务部署到新机器后,新机器对应目录里面有日志文件,日志文件修改时间离现在超过10m的,将被忽略。如果没有这个参数,则将目录下面的所有文件都传过去,不管时间是什么时候
ignore_older: 10m
5、类似logstash,为了防止重复处理日志,filebeat也会记录处理进度到文件data/registry
为了测试可以先停止filebeat,清空文件registry,然后再启动就会重复处理了
6、如果日志的格式直接是json格式,增加下面的配置字段,会直接将此json传过去,并在这个json中增加自定义的字段
- input_type: log paths: - /applogs/*.log json: keys_under_root: true add_error_key: true
7、自定义变量说明,当有多个日志目录,每个日志目录属于不同的业务,想把不同目录的日志内容发送到kafka不同的topic,可参看下面的方式
filebeat.prospectors: - input_type: log paths: - /applogs/*.log fields: log_topic: test version : "1.1" short_message : from_filebeat facility: www.abc.com env: test fields_under_root: true - input_type: log paths: - /aaa/*.log fields: log_topic: test1 version : "1.1" short_message : from_filebeat facility: www.bcd.com env: test fields_under_root: true output.kafka: enabled: true hosts: ["kafka.graylog.andblog.com:9092"] topic: '%{[log_topic]}' partition.round_robin: reachable_only: true max_message_bytes: 1000000 required_acks: 1
8、已删除文件,filebeat不释放,添加下面配置。在配置文件中添加close_timeout: 20m,保证每隔20分钟file handler被关闭,不管是否遇到EOF符号。
- type: log paths: ... force_close_files: true close_timeout: 20m
–
安装filebeat
使用配置,建议查看
https://www.elastic.co/guide/en/beats/filebeat/current/index.html
https://cloud.tencent.com/developer/article/1006051
里面的配置供参考
官方的选项介绍
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
kafka和zookeeper参考
https://my.oschina.net/xuesong0204/blog/919760
http://blog.51cto.com/tchuairen/1855090
http://blog.51cto.com/tchuairen/1861167
–
–
–
评论前必须登录!
注册