1、安装Java

如果不会安装,详细步骤请点击链接

a、解压到/usr/local/jdk1.8

#创建jdk1.8目录
mkdir -p /usr/local/jdk1.8
#解压文件
tar -zxf jdk-8u391-linux-x64.tar.gz -C /usr/local/jdk1.8 --strip-components=1

b、配置环境变量

#配置环境变量
vi /etc/profile
#将以下代码输入到文件中
export JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 
export PATH=$JAVA_HOME/bin:$PATH
#保存退出后刷新环境变量
source /etc/profile

c、查看版本

java -version

2、Elasticsearch(存储日志)

如果不会安装,详细步骤请点击链接

a、解压到/data/elk/elasticsearch

#创建elasticsearch目录
mkdir -p /data/elk/elasticsearch
#解压文件
tar -zxf elasticsearch-7.17.7-linux-x86_64.tar.gz -C /data/elk/elasticsearch --strip-components=1

b、修改配置

#进入配置目录
cd /data/elk/elasticsearch/config/
#编辑主配置文件 elasticsearch.yml
vi elasticsearch.yml
cluster.name: my-application
node.name: node-1
node.master: true
node.data: true
path.data: /data/elk/elasticsearch/data
path.logs: /data/elk/elasticsearch/logs
bootstrap.memory_lock: true
network.host: 127.0.0.1
http.port: 9200
transport.port: 9300
discovery.type: single-node
discovery.seed_hosts: ["127.0.0.1"]
gateway.recover_after_nodes: 1
action.destructive_requires_name: true
cluster.routing.allocation.disk.threshold_enabled: false
cluster.max_shards_per_node: 10000
# 设置集群中节点最大分片数量,即单个节点可创建的最大索引数量。默认为1000。
## x-pack ###
xpack.security.enabled: true
#增加跨域配置
http.cors.enabled: true
http.cors.allow-origin: "*"

c、修改系统配置

#修改最大打开文件数
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
* soft nproc 4096
* hard nproc 4096
#修改最大的链接数
vi /etc/sysctl.conf
vm.max_map_count=262144

d、创建运行用户

#创建用户
useradd -M elk
#设置密码
passwd elk
#授权目录
chown -R elk.elk /data/elk/elasticsearch

e、重启服务

reboot

f、切换用户启动

su elk
cd /data/elk/elasticsearch/bin
./elasticsearch -d

g、创建内置账号的密码

elastic,一个内置的超级用户。关联角色superuser。
apm_system,用于Elastic APM在Elasticsearch中存储监控信息时使用的用户。关联角色apm_system。
kibana,用于Kibana连接Elasticsearch并与之通信(已过时的内置用户)。关联角色kibana_system。
kibana_system,用户Kibana连接Elasticsearch并与之通信。关联角色kibana_system。
logstash_system,用于Logstash在Elasticsearch中存储监控信息时使用。关联角色logstash_system。
beats_system,用于Beats在Elasticsearch中存储监控信息时使用。关联角色beats_system。
remote_monitoring_user ,用于Metricbeat在Elasticsearch中收集和存储监控信息时使用。关联remote_monitoring_agent和remote_monitoring_collector角色 

elasticsearch-setup-passwords interactive

h、使用账号访问Elasticsearch(列出所有索引)

curl -X GET -u elastic:elastic http://127.0.0.1:9200/_cat/indices?v

i、设置开机启动

#创建elasticsearch服务文件
touch /etc/systemd/system/elasticsearch.service
#编辑elasticsearch服务文件
vi /etc/systemd/system/elasticsearch.service
[Unit]
Description=Elasticsearch
After=network.target
[Service]
Type=simple
User=elk
LimitMEMLOCK=infinity
LimitNOFILE=65535
WorkingDirectory=/data/elk/elasticsearch
ExecStart=/data/elk/elasticsearch/bin/elasticsearch
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable elasticsearch
#重启
reboot

j、elasticsearch操作命令

systemctl start elasticsearch.service	启动
systemctl status elasticsearch.service	查看状态
systemctl restart elasticsearch.service	重启
systemctl stop elasticsearch.service	停止

k、有必要的情况下,开启防火墙

firewall-cmd --zone=public --add-port=9200/tcp --permanent #单机版的
firewall-cmd --zone=public --add-port=9300/tcp --permanent #集群版的
firewall-cmd --reload
firewall-cmd --list-ports

3、Kibana(日志搜索与展示)

a、解压到/data/elk/kibana

#创建kibana目录
mkdir -p /data/elk/kibana/logs
#解压文件
tar -zxf kibana-7.17.7-linux-x86_64.tar.gz -C /data/elk/kibana --strip-components=1

b、修改配置

#进入配置文件
cd /data/elk/kibana/config
#修改配置文件
vi kibana.yml
server.port: 5601
server.host: "127.0.0.1"
server.publicBaseUrl: "http://es.elk.com"
elasticsearch.hosts: ["http://127.0.0.1:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "elastic"
logging.dest: /data/elk/kibana/logs/kibana.log
i18n.locale: "zh-CN"
#创建日志文件
cd /data/elk/kibana/logs/
touch kibana.log

c、使用elk用户启动Kibana

chown -R elk.elk /data/elk/kibana/
cd /data/elk/kibana/
nohup sudo -u elk bin/kibana &

d、有必要的情况下,开启防火墙

firewall-cmd --zone=public --add-port=5601/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

e、我采用的是nginx访问的

server {
    listen        80;
    server_name es.elk.com;
    error_log /data/nginx/es.elk.com_error.log crit;
    access_log /data/nginx/es.elk.com_acess_$logdate.log access-upstream;
    error_page   403 /403.html;
    error_page   404 /404.html;
    error_page      500 502 503 504 /50x.html;
    location / {
    proxy_pass http://127.0.0.1:5601;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
    location = /403.html {
    root "/data/wwwroot/error";
    }
    location = /404.html {
    root "/data/wwwroot/error";
    }
    location = /50x.html {
    root "/data/wwwroot/error";
    }
}

f、开机启动

#创建kibana服务文件
touch /etc/systemd/system/kibana.service
#编辑kibana服务文件
vi /etc/systemd/system/kibana.service
[Unit]
Description=kibana
After=network.target
[Service]
Type=simple
User=elk
ExecStart=/data/elk/kibana/bin/kibana
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable kibana
#重启
reboot

g、kibana常用命令

systemctl start kibana.service	启动
systemctl status kibana.service	查看状态
systemctl restart kibana.service	重启
systemctl stop kibana.service	停止

4、Kafka(消息队列)

如果不会安装,详细步骤请点击链接

a、解压到/data/elk/kafka/

#创建目录
mkdir -p /data/elk/kafka
#解压文件
tar -zxf kafka_2.12-2.8.1.tgz -C /data/elk/kafka --strip-components=1

b、创建数据目录

cd /data/elk/kafka
mkdir kafka_data
mkdir zk_data

c、配置zookeeper

#进入配置文件目录
cd config/
#编辑zookeeper.properties
vi zookeeper.properties
dataDir=/data/elk/kafka/zk_data
# zookeeper的数据目录。
clientPort=2181
# zookeeper监听端口。
maxClientCnxns=0
# 最大客户端并发连接数量。
### SASL ###
# 开启ZookeeperClient支持SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

d、启动zookeeper

../bin/zookeeper-server-start.sh -daemon /data/elk/kafka/config/zookeeper.properties

e、配置Kafka服务端(Broker)

#编辑server.properties
vi server.properties
############################# Server Basics #############################
# BrokerID,用于Kafka集群中代理节点的全局唯一编号。
broker.id=0
# 设置单个消息的最大值,默认为1M。
message.max.bytes=10000000
# 设置允许对Topic进行删除操作,否则无法删除Topic。只有开启此项的时候Topic才会被真正删除,否则Topic仅只是被标记为删除而已。为了安全性考虑的结果。
# delete.topic.enble=true
# 设置当生产者要推送消息的Topic不存在时自动创建对应的Topic。默认为true。
# auto.create.topics.enable=true
### SASL ###
# Broker之间通信所使用的的协议。
security.inter.broker.protocol=SASL_PLAINTEXT
# 启用Kafka服务器简单安全认证机列表。这个列表可以包括一个可用的安全提供者的任意机置。只有GSSAPI(通用安全服务应用接口)是默认启用的。
sasl.enabled.mechanisms=PLAIN
# 用于内部的broker通信的简单身份验证和安全层机置。默认为通用安全服务应用程序接口。
sasl.mechanism.inter.broker.protocol=PLAIN
# 开启简单的ACL访问控制功能。
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 在ACL中,如果列表中没找到任何ACL规则,表默认允许所有用户操作。
# allow.everyone.if.no.acl.found=true
# 设置Broker的超级管理用户为admin。
super.users=User:admin
############################# Socket Server Settings #############################
# 设置套接字监听器所使用的安全协议和端口。PLAINTEXT是Kafka默认使用的协议。这边将其修改为SASL_PLAINTEXT协议,为了增加基于SASL的身份认证功能。
listeners=SASL_PLAINTEXT://:9092
# 设置将Kafka的地址通告给生产者和消费者,如果不设置该项,则默认使用listeners的值。
# 由于我这边是虚拟机测试环境,想要生产者和消费者与Kafka正常通信,则需要设置该项为公网地址,这才可以正常工作。
advertised.listeners=SASL_PLAINTEXT://192.168.0.193:9092
# Broker接收网络请求所开启的线程数量。
num.network.threads=3
# Broker用于处理请求的线程数量,其中可能包括磁盘IO。
num.io.threads=8
# 套接字发送数据的缓冲区大小,单位为字节。
socket.send.buffer.bytes=102400
# 套接字接受数据的缓冲区大小,单位为字节。
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 套接字接受单个请求的最大大小(防止OOM)。
log.dirs=/data/elk/kafka/kafka_data
# Broker日志和数据存储目录路径。
num.partitions=1
# 每个Topic的默认日志分区数量,更多的分区将允许更多的并行度,这也将导致会产生更多的文件,但可以增加每个Topic的并发性能。默认为1
num.recovery.threads.per.data.dir=1
# 在启动和关闭Broker服务器时,每个数据目录(log.dirs)中日志恢复的线程数量。
############################# Internal Topic Settings  #############################
# 设置内部元数据Topic组的副本因子数量"__consumer_offsets"和"__transaction_state"。
# 除了开发测试之外,建议设置>1的值,比如3。
# 建议有几个节点就设置为几个副本数量。
offsets.topic.replication.factor=1
# 每个Topic的偏移值记录都在"__consumer_offsets"这个Topic中存储,该项用于设
# 置"__consumer_offsets"这个Topic的副本数量。以增加可靠性。
transaction.state.log.replication.factor=1
# 存储事务状态日志的Topic"__transaction_state"的副本数量。
transaction.state.log.min.isr=1
# 覆盖事务状态日志Topic中的min.insync.replicas配置。
# "min.insync.replicas"用于保证事务写入认为成功的最小副本数量,如果写入操作不能满足该要求,则生产者会引发一个异常"NotEnoughReplicas或NotEnoughReplicasAfterAppend"。
############################# Log Retention Policy #############################
log.retention.hours=168
# 日志(消息)保留时间,单位为小时。
# log.retention.bytes=1073741824
# 一个基于日志(消息)大小的保留策略。每个Topic下每个分区保存数据的最大文件大小。
# 当log.retention.hours和log.retention.bytes都存在时,当满足任何一个策略时都会删除。
# 该项如果设置为-1,则表示不删除。
log.segment.bytes=1073741824
# 日志段文件的最大大小,当达到此大小的时候,将创建一个新的日志段,单位为字节。
log.retention.check.interval.ms=300000
# 日志保留检查间隔时间,单位ms。
############################# Zookeeper #############################
zookeeper.connect=127.0.0.0.1:2181
# zookeeper连接地址。
zookeeper.connection.timeout.ms=18000
# zookeeper连接超时时间。
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# 设置所有成员都加入到消费者组中需要的延迟时间,以避免出现再平衡"rebalance"事件的发生。

f、配置生产者与消费者客户端

#编辑生产者
vi producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
#编辑消费者
vi consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

g、为Kafka添加用户和密码

# 创建Kafka服务器所使用的的账号密码以用于基于用户密码的身份验证
vi kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin888"
        user_admin="admin888";
};
# 创建生产者或消费者客户端登录所使用的用户密码文件,用于登录到Kafka服务器(broker)。
vi kafka_client_jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin888";
};
# 修改启动脚本文件(将内容写入到文件的最顶部)
vi ../bin/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_server_jaas.conf"
vi ../bin/kafka-console-producer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_client_jaas.conf"
vi ../bin/kafka-console-consumer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/elk/kafka/config/kafka_client_jaas.conf"

h、开启kafka JMX监控功能

注意:jmxremote.password 内容格式:[username] [password],jmxremote.access 内容格式:[username] [readonly|readwrite]
vi jmxremote.password
admin admin888
vi jmxremote.access
admin readonly
chmod 600 jmxremote.*

i、引入密码文件

vi ../bin/kafka-run-class.sh
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
    KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Djava.rmi.server.hostname="127.0.0.1" -Dcom.sun.management.jmxremote.authenticate=true  -Dcom.sun.management.jmxremote.ssl=false  -Dcom.sun.management.jmxremote.password.file=/data/elk/kafka/config/jmxremote.password -Dcom.sun.management.jmxremote.access.file=/data/elk/kafka/config/jmxremote.access"
fi
vi ../bin/kafka-server-start.sh
export JMX_PORT="12345"

j、启动Kafka服务器(Broker)

../bin/kafka-server-start.sh -daemon /data/elk/kafka/config/server.properties

k、测试kafka

#创建Topic
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 5 --replication-factor 1 --topic yulongwang-kafka
#查看Topic列表
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
#查看某个Topic状态信息
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic yulongwang-kafka

l、如果能看到如下命令输出,证明kafka已经安装成功

[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 5 --replication-factor 1 --topic yulongwang-kafka
Created topic yulongwang-kafka.
[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
yulongwang-kafka
[root@localhost config]# ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic yulongwang-kafka
Topic: yulongwang-kafka TopicId: lGHOp2UKSE6R_x8OXCF4UA PartitionCount: 5       ReplicationFactor: 1    Configs: 
        Topic: yulongwang-kafka Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: yulongwang-kafka Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: yulongwang-kafka Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: yulongwang-kafka Partition: 3    Leader: 0       Replicas: 0     Isr: 0
        Topic: yulongwang-kafka Partition: 4    Leader: 0       Replicas: 0     Isr: 0
[root@localhost config]# 

m、设置开机启动服务

#创建zookeeper文件
touch /lib/systemd/system/zookeeper.service
#编辑文件
vi /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root
ExecStart=/data/elk/kafka/bin/zookeeper-server-start.sh /data/elk/kafka/config/zookeeper.properties
ExecStop=/data/elk/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl enable zookeeper.service

#创建kafka文件
touch /lib/systemd/system/kafka.service
#编辑文件
vi /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root
ExecStart=/data/elk/kafka/bin/kafka-server-start.sh /data/elk/kafka/config/server.properties
ExecStop=/data/elk/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl enable kafka.service

n、常用命令

systemctl start zookeeper.service #启动
systemctl stop zookeeper.service #停止
systemctl status zookeeper.service #状态

systemctl start kafka.service #启动
systemctl stop kafka.service #停止
systemctl status kafka.service #状态

o、如果有必要的情况下,需要释放防火墙

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

5、Logstash(处理日志)

如果不会安装,详细步骤请点击链接

Logstash用于读取从Kafka+Filebeat采集来的日志数据,结构化处理后存储到Elasticsearch

a、解压到/data/elk/logstash

#创建文件
mkdir -p /data/elk/logstash
#解压文件
tar -zxf logstash-7.17.7-linux-x86_64.tar.gz -C /data/elk/logstash --strip-components=1

b、调整JVM堆内存大小

#进入logstash配置目录
cd /data/elk/logstash/config/
#修改JVM大小vim
jvm.options
-Xms4g
-Xmx4g

c、创建Kafka客户端认证文件

vi kafka_client_jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin888";
};

d、将重要的数据写入到lasticsearch中

#创建Logstash管道配置文件
cp logstash-sample.conf logstash.conf
#配置Logstash管道配置文件
vi logstash.conf
input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    topics => ["yulongwang-kafka"]
    consumer_threads => 1

    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    jaas_path => "/data/elk/logstash/config/kafka_client_jaas.conf"

    codec => "json"
  }
}

# -- 处理
filter {
}

# -- 输出
output {
  stdout { codec => rubydebug }
}

如果ELK及kafka都装好了的情况下,配置如下
input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    topics => ["yulongwang-kafka"]
    consumer_threads => 1
    security_protocol => "SASL_PLAINTEXT"
    sasl_mechanism => "PLAIN"
    jaas_path => "/data/elk/logstash/config/kafka_client_jaas.conf"
    codec => "json"
  }
}
filter {
  mutate { 
    replace => { "type" => "nginx_access" }
  }
  grok {
    patterns_dir => ["/data/elk/logstash/config/patterns"]
    match => [ "message","%{NGINXACCESSA}" ]
  }
  date {
    # 中国时间
    match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
    #timezone => "UTC"
    # 
    target => "@timestamp"
  }
  geoip {
    source => "clientip"
  }
  mutate {
    # 这里是根据参数组装了一个json字段放入es中
    add_field => {
      "test" => '{"httpversion":"%{httpversion}","host":"%{host}"}'
    }
    remove_field => ["@version","path","host","message","tags","httpversion","timestamp","agent"]
  }
}
output {
  #stdout { codec => rubydebug }
  elasticsearch {
      hosts => "127.0.0.1:9200"
      user => "elastic"
      password => "elastic"
      manage_template => false
      index => "%{[fields][hostenv]}-%{[fields][project]}-unkown"
      document_type => "%{[@metadata][type]}"
  }
}

e、设置服务配置端口

vi logstash.yml
api.http.port: 9600

f、启动停止logstash

nohup ./bin/logstash -f config/logstash.conf &>> /data/elk/logstash/logstash-server-`date "+%Y%m%d"`.log & echo $! > /data/elk/logstash/logstash.pid #启动
cat /data/elk/logstash/logstash.pid | xargs -I {} kill {} #停止

g、设置开机启动

#编辑启动文件
vi /etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
User=root
ExecStart=/data/elk/logstash/bin/logstash -f /data/elk/logstash/config/logstash.conf
Restart=always
[Install]
WantedBy=multi-user.target
#加入开机启动
systemctl daemon-reload
systemctl enable logstash.service
#常用命令
systemctl start logstash.service #启动
systemctl stop logstash.service #停止
systemctl status logstash.service #状态

h、如果有必要的情况下,需要释放防火墙

firewall-cmd --zone=public --add-port=9600/tcp --permanent
firewall-cmd --reload
firewall-cmd --list-ports

现在除了系统日志采集之外,其他日志都可以使用本系统咯

08-27 05:03