filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开

可以使用redis, 或者kafka, 这儿使用的是kafka

1, 安装

kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可

1), 启动内置zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2), 修改kafka的配置文件

vim ./conf/server.properties

############################# Server Basics #############################
broker.id=
delete.topic.enable=true ############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=
num.io.threads=
socket.send.buffer.bytes=
socket.receive.buffer.bytes=
socket.request.max.bytes= ############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=
num.recovery.threads.per.data.dir= ############################# Log Flush Policy #############################
log.flush.interval.messages=
log.flush.interval.ms= ############################# Log Retention Policy #############################
log.retention.hours=
log.segment.bytes=
log.retention.check.interval.ms= ############################# Zookeeper #############################
zookeeper.connect=localhost:
zookeeper.connection.timeout.ms=

3), 启动kafkaserver

/bin/kafka-server-start.sh ./config/server.properties &

4),修改filebeat文件, 最终形态

cat ./elk/filebeat-5.5.-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'
filebeat.prospectors:
- input_type: log
paths:
- /var/log/nginx/*.log
encoding: utf-8
document_type: my-nginx-log
scan_frequency: 5s
harvester_buffer_size: 16384
max_bytes: 10485760
tail_files: true
output.kafka:
enabled: true
hosts: ["www.wenbronk.com:9092"]
topic: elk-%{[type]}
worker: 2
max_retries: 3
bulk_max_size: 2048
timeout: 30s
broker_timeout: 10s
channel_buffer_size: 256
keep_alive: 60
compression: gzip
max_message_bytes: 1000000
required_acks: 0
client_id: beats

5), 重新启动filebeat

./filebeat -c ./filebeat.yml &

6), 修改 logstash的input

input {
kafka {
#codec => "json"
topics_pattern => "elk-.*"
bootstrap_servers => "127.0.0.1:9092"
auto_offset_reset => "latest"
group_id => "logstash-g1"
}
}
output {
elasticsearch { #Logstash输出到elasticsearch;
hosts => ["localhost:9200"] #elasticsearch为本地;
index => "logstash-nginx-%{+YYYY.MM.dd}" #创建索引;
document_type => "nginx" #文档类型;
workers => #进程数量;
user => elastic #elasticsearch的用户;
password => changeme #elasticsearch的密码;
flush_size =>
idle_flush_time =>
}
}

7), 重启logstash

8 ), 页面访问 nginx, 可以查看消息队列中的消息

./bin/kafka-console-consumer.sh --zookeeper localhost: --topic elk-log -m-beginning

参考: http://www.ywnds.com/?p=9776

05-11 13:04