1、安装Java
如果不会安装,详细步骤请点击链接
a、解压到/usr/local/jdk1.8
#创建jdk1.8目录
mkdir -p /usr/local/jdk1.8
#解压文件
tar -zxf jdk-8u391-linux-x64.tar.gz -C /usr/local/jdk1.8 --strip-components=1
b、配置环境变量
#配置环境变量
vi /etc/profile
#将以下代码输入到文件中
export JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
#保存退出后刷新环境变量
source /etc/profile
c、查看版本
java -version
2、Elasticsearch(存储日志)
如果不会安装,详细步骤请点击链接
a、解压到/data/elk/elasticsearch
#创建elasticsearch目录
mkdir -p /data/elk/elasticsearch
#解压文件
tar -zxf elasticsearch-7.17.7-linux-x86_64.tar.gz -C /data/elk/elasticsearch --strip-components=1
b、修改配置
#进入配置目录
cd /data/elk/elasticsearch/config/
#编辑主配置文件 elasticsearch.yml
vi elasticsearch.yml
cluster.name: my-application
node.name: node-1
node.master: true
node.data: true
path.data: /data/elk/elasticsearch/data
path.logs: /data/elk/elasticsearch/logs
bootstrap.memory_lock: true
network.host: 127.0.0.1
http.port: 9200
transport.port: 9300
discovery.type: single-node
discovery.seed_hosts: ["127.0.0.1"]
gateway.recover_after_nodes: 1
action.destructive_requires_name: true
cluster.routing.allocation.disk.threshold_enabled: false
cluster.max_shards_per_node: 10000
# 设置集群中节点最大分片数量,即单个节点可创建的最大索引数量。默认为1000。
## x-pack ###
xpack.security.enabled: true
#增加跨域配置
http.cors.enabled: true
http.cors.allow-origin: "*"
c、修改系统配置
#修改最大打开文件数
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
* soft nproc 4096
* hard nproc 4096
#修改最大的链接数
vi /etc/sysctl.conf
vm.max_map_count=262144
d、创建运行用户
#创建用户
useradd -M elk
#设置密码
passwd elk
#授权目录
chown -R elk.elk /data/elk/elasticsearch
e、重启服务
reboot
f、切换用户启动
su elk
cd /data/elk/elasticsearch/bin
./elasticsearch -d
g、创建内置账号的密码
elastic,一个内置的超级用户。关联角色superuser。
apm_system,用于Elastic APM在Elasticsearch中存储监控信息时使用的用户。关联角色apm_system。
kibana,用于Kibana连接Elasticsearch并与之通信(已过时的内置用户)。关联角色kibana_system。
kibana_system,用户Kibana连接Elasticsearch并与之通信。关联角色kibana_system。
logstash_system,用于Logstash在Elasticsearch中存储监控信息时使用。关联角色logstash_system。
beats_system,用于Beats在Elasticsearch中存储监控信息时使用。关联角色beats_system。
remote_monitoring_user ,用于Metricbeat在Elasticsearch中收集和存储监控信息时使用。关联remote_monitoring_agent和remote_monitoring_collector角色
elasticsearch-setup-passwords interactive
h、使用账号访问Elasticsearch(列出所有索引)
curl -X GET -u elastic:elastic http://127.0.0.1:9200/_cat/indices?v
i、设置开机启动
#创建elasticsearch服务文件
touch /etc/systemd/system/elasticsearch.service
#编辑elasticsearch服务文件
vi /etc/systemd/system/elasticsearch.service
[Unit]
Description=Elasticsearch
After=network.target
[Service]
Type=simple
User=elk
LimitMEMLOCK=infinity
LimitNOFILE=65535
WorkingDirectory=/data/elk/elasticsearch
ExecStart=/data/elk/elasticsearch/bin/elasticsearch
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable elasticsearch
#重启
reboot
j、elasticsearch操作命令
systemctl start elasticsearch.service 启动
systemctl status elasticsearch.service 查看状态
systemctl restart elasticsearch.service 重启
systemctl stop elasticsearch.service 停止
k、有必要的情况下,开启防火墙
firewall-cmd --zone=public --add-port=9200/tcp --permanent #单机版的
firewall-cmd --zone=public --add-port=9300/tcp --permanent #集群版的
firewall-cmd --reload
firewall-cmd --list-ports
3、Kibana(日志搜索与展示)
a、解压到/data/elk/kibana
#创建kibana目录
mkdir -p /data/elk/kibana/logs
#解压文件
tar -zxf kibana-7.17.7-linux-x86_64.tar.gz -C /data/elk/kibana --strip-components=1
b、修改配置
#进入配置文件
cd /data/elk/kibana/config
#修改配置文件
vi kibana.yml
server.port: 5601
server.host: "127.0.0.1"
server.publicBaseUrl: "http://es.elk.com"
elasticsearch.hosts: ["http://127.0.0.1:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "elastic"
logging.dest: /data/elk/kibana/logs/kibana.log
i18n.locale: "zh-CN"
#创建日志文件
cd /data/elk/kibana/logs/
touch kibana.log
c、使用elk用户启动Kibana
chown -R elk.elk /data/elk/kibana/
cd /data/elk/kibana/
nohup sudo -u elk bin/kibana &
d、有必要的情况下,开启防火墙
firewall-cmd --zone=public --add-port=5601/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports
e、我采用的是nginx访问的
server {
listen 80;
server_name es.elk.com;
error_log /data/nginx/es.elk.com_error.log crit;
access_log /data/nginx/es.elk.com_acess_$logdate.log access-upstream;
error_page 403 /403.html;
error_page 404 /404.html;
error_page 500 502 503 504 /50x.html;
location / {
proxy_pass http://127.0.0.1:5601;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location = /403.html {
root "/data/wwwroot/error";
}
location = /404.html {
root "/data/wwwroot/error";
}
location = /50x.html {
root "/data/wwwroot/error";
}
}
f、开机启动
#创建kibana服务文件
touch /etc/systemd/system/kibana.service
#编辑kibana服务文件
vi /etc/systemd/system/kibana.service
[Unit]
Description=kibana
After=network.target
[Service]
Type=simple
User=elk
ExecStart=/data/elk/kibana/bin/kibana
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable kibana
#重启
reboot
g、kibana常用命令
systemctl start kibana.service 启动
systemctl status kibana.service 查看状态
systemctl restart kibana.service 重启
systemctl stop kibana.service 停止
4、Kafka(消息队列)
如果不会安装,详细步骤请点击链接
a、解压到/data/elk/kafka/
#创建目录
mkdir -p /data/elk/kafka
#解压文件
tar -zxf kafka_2.12-2.8.1.tgz -C /data/elk/kafka --strip-components=1
b、创建数据目录
cd /data/elk/kafka
mkdir kafka_data
mkdir zk_data
c、配置zookeeper
#进入配置文件目录
cd config/
#编辑zookeeper.properties
vi zookeeper.properties
dataDir=/data/elk/kafka/zk_data
# zookeeper的数据目录。
clientPort=2181
# zookeeper监听端口。
maxClientCnxns=0
# 最大客户端并发连接数量。
### SASL ###
# 开启ZookeeperClient支持SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
d、启动zookeeper
../bin/zookeeper-server-start.sh -daemon /data/elk/kafka/config/zookeeper.properties
e、配置Kafka服务端(Broker)
#编辑server.properties
vi server.properties
############################# Server Basics #############################
# BrokerID,用于Kafka集群中代理节点的全局唯一编号。
broker.id=0
# 设置单个消息的最大值,默认为1M。
message.max.bytes=10000000
# 设置允许对Topic进行删除操作,否则无法删除Topic。只有开启此项的时候Topic才会被真正删除,否则Topic仅只是被标记为删除而已。为了安全性考虑的结果。
# delete.topic.enble=true
# 设置当生产者要推送消息的Topic不存在时自动创建对应的Topic。默认为true。
# auto.create.topics.enable=true
### SASL ###
# Broker之间通信所使用的的协议。
security.inter.broker.protocol=SASL_PLAINTEXT
# 启用Kafka服务器简单安全认证机列表。这个列表可以包括一个可用的安全提供者的任意机置。只有GSSAPI(通用安全服务应用接口)是默认启用的。
sasl.enabled.mechanisms=PLAIN
# 用于内部的broker通信的简单身份验证和安全层机置。默认为通用安全服务应用程序接口。
sasl.mechanism.inter.broker.protocol=PLAIN
# 开启简单的ACL访问控制功能。
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 在ACL中,如果列表中没找到任何ACL规则,表默认允许所有用户操作。
# allow.everyone.if.no.acl.found=true
# 设置Broker的超级管理用户为admin。
super.users=User:admin
############################# Socket Server Settings #############################
# 设置套接字监听器所使用的安全协议和端口。PLAINTEXT是Kafka默认使用的协议。这边将其修改为SASL_PLAINTEXT协议,为了增加基于SASL的身份认证功能。
listeners=SASL_PLAINTEXT://:9092
# 设置将Kafka的地址通告给生产者和消费者,如果不设置该项,则默认使用listeners的值。
# 由于我这边是虚拟机测试环境,想要生产者和消费者与Kafka正常通信,则需要设置该项为公网地址,这才可以正常工作。
advertised.listeners=SASL_PLAINTEXT://192.168.0.193:9092
# Broker接收网络请求所开启的线程数量。
num.network.threads=3
# Broker用于处理请求的线程数量,其中可能包括磁盘IO。
num.io.threads=8
# 套接字发送数据的缓冲区大小,单位为字节。
socket.send.buffer.bytes=102400
# 套接字接受数据的缓冲区大小,单位为字节。
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 套接字接受单个请求的最大大小(防止OOM)。
log.dirs=/data/elk/kafka/kafka_data
# Broker日志和数据存储目录路径。
num.partitions=1
# 每个Topic的默认日志分区数量,更多的分区将允许更多的并行度,这也将导致会产生更多的文件,但可以增加每个Topic的并发性能。默认为1
num.recovery.threads.per.data.dir=1
# 在启动和关闭Broker服务器时,每个数据目录(log.dirs)中日志恢复的线程数量。
############################# Internal Topic Settings #############################
# 设置内部元数据Topic组的副本因子数量"__consumer_offsets"和"__transaction_state"。
# 除了开发测试之外,建议设置>1的值,比如3。
# 建议有几个节点就设置为几个副本数量。
offsets.topic.replication.factor=1
# 每个Topic的偏移值记录都在"__consumer_offsets"这个Topic中存储,该项用于设
# 置"__consumer_offsets"这个Topic的副本数量。以增加可靠性。
transaction.state.log.replication.factor=1
# 存储事务状态日志的Topic"__transaction_state"的副本数量。
transaction.state.log.min.isr=1
# 覆盖事务状态日志Topic中的min.insync.replicas配置。
# "min.insync.replicas"用于保证事务写入认为成功的最小副本数量,如果写入操作不能满足该要求,则生产者会引发一个异常"NotEnoughReplicas或NotEnoughReplicasAfterAppend"。
############################# Log Retention Policy #############################
log.retention.hours=168
# 日志(消息)保留时间,单位为小时。
# log.retention.bytes=1073741824
# 一个基于日志(消息)大小的保留策略。每个Topic下每个分区保存数据的最大文件大小。
# 当log.retention.hours和log.retention.bytes都存在时,当满足任何一个策略时都会删除。
# 该项如果设置为-1,则表示不删除。
log.segment.bytes=1073741824
# 日志段文件的最大大小,当达到此大小的时候,将创建一个新的日志段,单位为字节。
log.retention.check.interval.ms=300000
# 日志保留检查间隔时间,单位ms。
############################# Zookeeper #############################
zookeeper.connect=127.0.0.0.1:2181
# zookeeper连接地址。
zookeeper.connection.timeout.ms=18000
# zookeeper连接超时时间。
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# 设置所有成员都加入到消费者组中需要的延迟时间,以避免出现再平衡"rebalance"事件的发生。
f、配置生产者与消费者客户端
#编辑生产者
vi producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
#编辑消费者
vi consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
g、为Kafka添加用户和密码
# 创建Kafka服务器所使用的的账号密码以用于基于用户密码的身份验证
vi kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin888"
user_admin="admin888";
};
# 创建生产者或消费者客户端登录所使用的用户密码文件,用于登录到Kafka服务器(broker)。
vi kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin888";
};
# 修改启动脚本文件(将内容写入到文件的最顶部)
vi ../bin/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_server_jaas.conf"
vi ../bin/kafka-console-producer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_client_jaas.conf"
vi ../bin/kafka-console-consumer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_client_jaas.conf"
h、开启kafka JMX监控功能
注意:jmxremote.password 内容格式:[username] [password],jmxremote.access 内容格式:[username] [readonly|readwrite]
vi jmxremote.password
admin admin888
vi jmxremote.access
admin readonly
chmod 600 jmxremote.*
i、引入密码文件
vi ../bin/kafka-run-class.sh
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Djava.rmi.server.hostname="127.0.0.1" -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.password.file=/data/elk/kafka/config/jmxremote.password -Dcom.sun.management.jmxremote.access.file=/data/elk/kafka/config/jmxremote.access"
fi
vi ../bin/kafka-server-start.sh
export JMX_PORT="12345"
j、启动Kafka服务器(Broker)
../bin/kafka-server-start.sh -daemon /data/elk/kafka/config/server.properties
k、测试kafka
#创建Topic
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 5 --replication-factor 1 --topic yulongwang-kafka
#查看Topic列表
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
#查看某个Topic状态信息
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic yulongwang-kafka
l、如果能看到如下命令输出,证明kafka已经安装成功
[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 5 --replication-factor 1 --topic yulongwang-kafka
Created topic yulongwang-kafka.
[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
yulongwang-kafka
[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic yulongwang-kafka
Topic: yulongwang-kafka TopicId: lGHOp2UKSE6R_x8OXCF4UA PartitionCount: 5 ReplicationFactor: 1 Configs:
Topic: yulongwang-kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: yulongwang-kafka Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: yulongwang-kafka Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: yulongwang-kafka Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: yulongwang-kafka Partition: 4 Leader: 0 Replicas: 0 Isr: 0
[root@localhost config]#
m、设置开机启动服务
#创建zookeeper文件
touch /lib/systemd/system/zookeeper.service
#编辑文件
vi /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root
ExecStart=/data/elk/kafka/bin/zookeeper-server-start.sh /data/elk/kafka/config/zookeeper.properties
ExecStop=/data/elk/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl enable zookeeper.service
#创建kafka文件
touch /lib/systemd/system/kafka.service
#编辑文件
vi /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root
ExecStart=/data/elk/kafka/bin/kafka-server-start.sh /data/elk/kafka/config/server.properties
ExecStop=/data/elk/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl enable kafka.service
n、常用命令
systemctl start zookeeper.service #启动
systemctl stop zookeeper.service #停止
systemctl status zookeeper.service #状态
systemctl start kafka.service #启动
systemctl stop kafka.service #停止
systemctl status kafka.service #状态
o、如果有必要的情况下,需要释放防火墙
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports
5、Logstash(处理日志)
如果不会安装,详细步骤请点击链接
Logstash用于读取从Kafka+Filebeat采集来的日志数据,结构化处理后存储到Elasticsearch
a、解压到/data/elk/logstash
#创建文件
mkdir -p /data/elk/logstash
#解压文件
tar -zxf logstash-7.17.7-linux-x86_64.tar.gz -C /data/elk/logstash --strip-components=1
b、调整JVM堆内存大小
#进入logstash配置目录
cd /data/elk/logstash/config/
#修改JVM大小vim
jvm.options
-Xms4g
-Xmx4g
c、创建Kafka客户端认证文件
vi kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin888";
};
d、将重要的数据写入到lasticsearch中
#创建Logstash管道配置文件
cp logstash-sample.conf logstash.conf
#配置Logstash管道配置文件
vi logstash.conf
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topics => ["yulongwang-kafka"]
consumer_threads => 1
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "/data/elk/logstash/config/kafka_client_jaas.conf"
codec => "json"
}
}
# -- 处理
filter {
}
# -- 输出
output {
stdout { codec => rubydebug }
}
如果ELK及kafka都装好了的情况下,配置如下
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topics => ["yulongwang-kafka"]
consumer_threads => 1
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path => "/data/elk/logstash/config/kafka_client_jaas.conf"
codec => "json"
}
}
filter {
mutate {
replace => { "type" => "nginx_access" }
}
grok {
patterns_dir => ["/data/elk/logstash/config/patterns"]
match => [ "message","%{NGINXACCESSA}" ]
}
date {
# 中国时间
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
#timezone => "UTC"
#
target => "@timestamp"
}
geoip {
source => "clientip"
}
mutate {
# 这里是根据参数组装了一个json字段放入es中
add_field => {
"test" => '{"httpversion":"%{httpversion}","host":"%{host}"}'
}
remove_field => ["@version","path","host","message","tags","httpversion","timestamp","agent"]
}
}
output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => "127.0.0.1:9200"
user => "elastic"
password => "elastic"
manage_template => false
index => "%{[fields][hostenv]}-%{[fields][project]}-unkown"
document_type => "%{[@metadata][type]}"
}
}
e、设置服务配置端口
vi logstash.yml
api.http.port: 9600
f、启动停止logstash
nohup ./bin/logstash -f config/logstash.conf &>> /data/elk/logstash/logstash-server-`date "+%Y%m%d"`.log & echo $! > /data/elk/logstash/logstash.pid #启动
cat /data/elk/logstash/logstash.pid | xargs -I {} kill {} #停止
g、设置开机启动
#编辑启动文件
vi /etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
User=root
ExecStart=/data/elk/logstash/bin/logstash -f /data/elk/logstash/config/logstash.conf
Restart=always
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable logstash.service
#常用命令
systemctl start logstash.service #启动
systemctl stop logstash.service #停止
systemctl status logstash.service #状态
h、如果有必要的情况下,需要释放防火墙
firewall-cmd --zone=public --add-port=9600/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports
现在除了系统日志采集之外,其他日志都可以使用本系统咯