把之前的logstash笔记分享,轻拍。
Logstash
doc
https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-update
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.0.tar.gz
2.x需要java7的支持
文件
bin/logstash 主程序文件
配置文件需要自己编写
安装使用
tar *.tar.gz
cd logstash
vi logstash.conf
input output是两个必要定义的元素
filter 是可选的,作用是按规定修改数据
格式
1,命令形式 bin/logstash -e 'input { stdin { } } output { stdout {} }'
stdin 是标准输入plugin
stdout 是标准输出,屏幕
2,文件形式
input {
stdin {}
}
output {
stdout {}
}
例:文件输入,输出esasticsearch
input {
file => {
path => "/var/log/syslog"
start_position => "beginning"
ignore_older => 0
}
}
output {
esasticsearch {
hosts => [ "localhost:9200" ]
}
}
查看导入es的数据
curl -XGET 'localhost:9200/logstash-*/_search?pretty&q=response=200'
命令
logstash-plugin
list List all installed Logstash plugins
Usage:
bin/logstash-plugin list [OPTIONS] [PLUGIN]
Parameters:
[PLUGIN] Part of plugin name to search for, leave empty for all plugins
Options:
--installed List only explicitly installed plugins using bin/logstash-plugin install ... (default: false)
--verbose Also show plugin version number (default: false)
--group NAME Filter plugins per group: input, output, filter or codec
-h, --help print help
install Install a Logstash plugin
Usage:
bin/logstash-plugin install [OPTIONS] [PLUGIN] ...
Parameters:
[PLUGIN] ... plugin name(s) or file
Options:
--version VERSION version of the plugin to install
--[no-]verify verify plugin validity before installation (default: true)
--preserve preserve current gem options (default: false)
--development install all development dependencies of currently installed plugins (default: false)
--local force local-only plugin installation. see bin/logstash-plugin package|unpack (default: false)
-h, --help print help
remove Remove a Logstash plugin
update Update a plugin
pack Package currently installed plugins, Deprecated: Please use prepare-offline-pack instead
unpack Unpack packaged plugins, Deprecated: Please use prepare-offline-pack instead
generate Create the foundation for a new plugin
prepare-offline-pack Create an archive of specified plugins to use for offline installation
启动
5.x以上默认开启了xpack, 如果连不上es会报错,在logstash.yml里增加xpack.monitoring.enabled: false
命令:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
配置文件:
bin/logstash -f logstash.conf
logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
【logstash.d/*.conf 这样只会读取排第一后缀是.conf的文件】
注意:
logstash 列出目录下所有文件时,是字母排序的。而 logstash 配置段的 filter 和 output 都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用 if 判断限定不同日志的动作
参数:
--allow-unsafe-shutdown 迫使logstash退出时关闭,默认情况下,logstash将拒绝离开,直到所有收到事件输出完
-w, --pipeline-workers COUNT 工作的pipeline数量,默认是 CPU 核数
-b, --pipeline-batch-size 每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存
-u, --pipeline-batch-delay 每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms
-l, --log FILE 自身的日志
--quiet 只打印出error级别的日志
--verbose 日志级别
--debug 日志级别
-v 日志级别
-p, --pluginpath PATH 加载自己的插件
-t, --configtest 检查配置文件的语法错误
-r, --[no-]auto-reload 监视配置文件的更新,并自动加载
--reload-interval RELOAD_INTERVAL 配置文件监控的间隔,默认是3秒
--allow-env 支持shell的变量
语法
Logstash从 1.3.0 版开始支持条件判断和表达式。
字段引用、sprintf格式、条件判断只能用于filter和output,不能用于input。
if
支持: grok
不支持: mutate
if "ab" in [fieldname] {
...
} else if "a" in "ab" {
...
} else {
...
}
==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于)
=~(匹配正则), !~(不匹配正则)
in(包含), not in(不包含)
and(与), or(或), nand(非与), xor(非或)
()(复合表达式), !()(对复合表达式结果取反)
filter {
if [action] == "login" {
mutate { remove => "secret" }
}
}
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}
if [loglevel] == "ERROR" and [deployment] == "production"
if [foo] in [foobar]
if [foo] in "foo"
if "hello" in [greeting]
if [foo] in ["hello", "world", "foo"]
if [missing] in [alsomissing]
if !("foo" in ["hello", "world"])
if [foo] #判断filed是否存在,不能加双引号
数据类型
bool
debug => true
string
host => "hostname"
number
port => 514
array
match => ["datetime", "UNIX", "ISO8601"]
hash
options => {
key1 => "value1",
key2 => "value2"
}
注意:如果你用的版本低于 1.2.0,哈希的语法跟数组是一样的,像下面这样写:
match => [ "field1", "pattern1", "field2", "pattern2" ]
字段
#字段引用使用[]号,比如使用status做判断,if [status] = 200 {}
#若是要取得字段的值,使用 %{ip}
#取os的值,需要这样:[ua][os],可以把ua看作数组名,os是下标。
字段是 Logstash::Event 对象的属性
字段引用
[tags]
多维哈希表,或者叫哈希的哈希值获取
options => {
a => {
b => "b"
}
}
[options][a][0]
logstash 的数组也支持倒序下标,即 [geoip][location][-1] 可以获取数组最后一个元素的值。
Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:
"the longitude is %{[geoip][location][0]}"
配置
host字段自定义
host会使用hosts文件的第一行作为内容
127.0.0.1 localhost
通用
每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和 remove_field
type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的
input
file
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。
sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos
一些比较有用的配置项:
tag
此设置没有默认值。向您的活动添加任意数量的任意标签
discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
close_older
一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
ignore_older
在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
sincedb_path
记录文件读取到的位置(如果文件被切割,我的想法是:logstash只会写新的值进来,新进来的日志仍然正常读取)
如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。
sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position
只有在一开始的时候才有效,如果有了sincedb的信息,此选项无效
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F。
注意
FileWatch 只支持文件的绝对路径,而且会不自动递归目录。
能写成 path => "/path/to/*/*/*/*.log"。FileWatch 模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用 ** 来缩写表示递归全部子目录。
start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos 开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件(官方博客上提供了另一个巧妙的思路:将 sincedb_path 定义为 /dev/null,则每次重启自动从头开始读)。
因为 windows 平台上没有 inode 的概念,Logstash 某些版本在 windows 平台上监听文件不是很靠谱。windows 平台上,推荐考虑使用 nxlog。
stdin
标准输入
syslog
本机的 syslog 就会默认发送到 logstash 里
redis
list => BLPOP
channel => SUBSCRIBE
pattern_channel => PSUBSCRIBE
配置:
input {
redis {
batch_count => 1
data_type => "list"
key => "logstash-*"
host => "192.168.0.2"
port => 6379
threads => 5
}
}
kafka {
#https://www.elastic.co/guide/en/logstash/2.3/plugins-inputs-kafka.html#plugins-inputs-kafka-zk_connect
#----------- 5.0以下的配置
zk_connect => "10.26.93.65:2181,127.0.0.1:2181" #Zookeeper的地址
group_id => "nginx_log"
#topic订阅有三个可选topic_id or white_list or black_list
#white_list => "sudiyi_.*" #Whitelist of topics to include for consumption.
#black_list => "xx.*|aabb" #Blacklist of topics to exclude from consumption.
topic_id => "app_server" #单个topic订阅
consumer_threads => 5 #应该与kafka的Partition一样
# ----------5.0以上配置
bootstrap_servers => "10.26.93.65:9092,127.0.0.1:9092" #kafka集群的地址
group_id => "nginx_log"
#topic
codec => "json" #约定数据传输格式,就是编码和解码
topics => ["sudiyi_express_admin","user_center_service"]
#topics_pattern => "user_center.*" #如果这个存在,topics会被忽略
#线程完美的配置应该和kafkapartitions一样,过多的线程是浪费的
consumer_threads => 3
}
编码插件
input | decode | filter | encode | output
filter
grok
调试:http://grokdebug.herokuapp.com/
自带:
%{IPORHOST:clientip}
%{USER:ident}
%{USER:auth}
%{HTTPDATE:timestamp}
%{WORD:verb}
%{NOTSPACE:request}
%{NUMBER:httpversion}
%{DATA:rawrequest}
%{NUMBER:response}
%{NUMBER:bytes}
%{COMMONAPACHELOG}
%{QS:referrer}
%{QS:agent}
pattern => “\[(?<datetime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s(?<level>\w*)\s\”Crawl\surl:(?<url>(.*))”
Grok 支持把预定义的 grok 表达式 写入到文件中
格式
定义:USERNAME [a-zA-Z0-9._-]+
引用:USER %{USERNAME}
%{PATTERN_NAME:capture_name:data_type}
实例:%{NUMBER:request_time:float}
patterns_dir 选项来指明grok 表达式文件
patterns_dir => "/path/to/your/own/patterns"
多行匹配
在和 codec/multiline 搭配使用的时候,需要注意一个问题,grok 正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要 =~ //m 一样也需要单独指定,具体写法是在表达式开始位置加 (?m) 标记
多项选择
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"f
]
logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止
remove_field 参数来删除掉 message 字段,或者用 overwrite 参数来重写默认的 message 字段,只保留最重要的部分
remove_fileld => ["message"]
dissect
logstash不自带, 需要手动安装bin/logstash-plugin install logstash-filter-dissect
这是一个跟grok比较像的过滤器
一个dissect过滤器最好只mapping一个字段,尝试过mapping两个字段会报key的错误,因为第二个字段是从第一个字段产生的,可能是因为第二个字段不能及时产生造成的报错
例子数据:
2017-05-04 19:06:48,079 [ForkJoinPool-1-worker-11] INFO c.s.t.s.s.i.ReceiveEventServiceImpl:148: Heartbeat deviceId= 1000269
mapping => {
"message" => "%{datetime} [%{thread_name}] %{level} %{action} %{info_name} %{info_body}"
}
# "要匹配的字段" => "匹配语法"
#结果: datetime: 2017-05-04 19:06:48,079
thread_name: ForkJoinPool-1-worker-11
level: INFO
...
#Heartbeat后面归到一个fileld,然后可以用kv过滤器处理
date
#匹配上了会自动替换@timestamp字段
#SSS是毫秒
date {
# [ "field_name", "时间格式1", "时间格式2" ]
match => ["date_field", "ISO8601", "Y-MM-dd HH:mm:ss,SSS"]
remove_field => [ "date_field", "my_extraneous_field" ] 删除
}
UNIX_MS 毫秒时间戳
UNIX 秒时间戳
很多中国用户经常提一个问题:为什么 @timestamp 比我们晚了 8 个小时?怎么修改成北京时间?
其实,Elasticsearch 内部,对时间类型字段,是统一采用 UTC 时间,存成 long 长整形数据的!对日志统一采用 UTC 时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里——不像中国,地域横跨五个时区却只用北京时间。
对于页面查看,ELK 的解决方案是在 Kibana 上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。
所以,建议大家接受这种设定。否则,即便你用 .getLocalTime 修改,也还要面临在 Kibana 上反过去修改,以及 Elasticsearch 原有的 ["now-1h" TO "now"] 这种方便的搜索语句无法正常使用的尴尬。
geoip
geoip {
source => "message"
}
过滤
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
需要注意的是:geoip.location 是 logstash 通过 latitude 和 longitude 额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:
filter {
geoip {
fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]
remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
}
}
mutate
数据类型修改
mutate {
convert => ["request_time", "float"]
#5.2语法
#convert => { "info_content[deviceId]" => "integer" }
#convert => { "msgId" => "integer" }
}
字符串处理
gsub
通过应用正则表达式和替换来转换字符串字段。 如果字段不是字符串,则不会执行任何操作。
add和gsub最好不要放到一个mutate里面,有时候刚add,gsub执行没有效果
gsub => [
"urlparams", "[\\?#]", "_", #替换反斜杠,问号,#号
"fieldname", "/", "_" # _替换为/
]
split
filter {
mutate {
split => ["message", "|"]
}
}
join
仅对数组类型字段有效
我们在之前已经用 split 割切的基础再 join 回去。配置改成:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
替换一个field的内容, %{}是引用字段的内容
replace => {
"type" => "app_nginx_%{type}"
}
增加字段
add_field => { "visit_device" => "%{[http_user_agent][0]}" }
merge
合并两个数组或者哈希字段。依然在之前 split 的基础上继续:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "message"]
}
}
字段处理
rename
重命名某个字段,如果目的字段已经存在,会被覆盖掉:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
update
更新某个字段的内容。如果字段不存在,不会新建。
mutate {
update => { "sample" => "My new message" }
}
replace
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
执行次序
需要注意的是,filter/mutate 内部是有执行次序的。其次序如下:
rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)
split 拆分事件
split 插件中使用的是 yield 功能,其结果是 split 出来的新事件,会直接结束其在 filter 阶段的历程,也就是说写在 split 后面的其他 filter 插件都不起作用,进入到 output 阶段。所以,一定要保证 split 配置写在全部 filter 配置的最后
terminator 指定切割的间隔符, Default value is "\n"
field 指定切割的域, default value is "message"
kv
处理key=value这种数据结构
source 数据源,Default value is "message"
target 把取出来的数据放到一个容器里面,相当于list
target => "kv"
transform_key 转变key的值为大写或小写
transform_value 转变value的值为大写或小写
trimkey 修剪key包含的字符like [ or ] using \.
trim 修剪value包含的字符like [ or ] using \.(有时候value后面会包括一些无用的分隔符,需要用这个设置处理掉)
value_split 按什么符号切分,Default value is "="
output
file {
path => "/path/xx"
codec => line { format => "custom format: %{message}"}
}
codec => dots #把每个 event 都变成一个点(.)
codec=>rubydebug
null {} 抛弃所有 event
输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备
kafka
kafka版本是0.9要对应使用logstash版本2.3.x以下,否则导不了数据到kafka
fluntd也有kafka插件
kafka
kafka{
bootstrap_servers => "xx.sudiyi.cn:9092" #由于kafka集群配置,可能这里需要使用域名:端口(在host文件里配)
topic_id => "test01"
codec => "json" #约定数据传输格式,就是编码和解码
# https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#codec
# json
# plain 无格式
}
elasticsearch
HTTP , node 和 transport 方式
elasticsearch {
hosts => ["es-node02:9200"] #最好在hosts里映射ip地址
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
workers => 1 #5.x这个是string类型
flush_size => 20000
idle_flush_time => 10
template_overwrite => true
user => ""
password => ""
}
配置
批量发送
flush_size 和 idle_flush_time 共同控制 Logstash 向 Elasticsearch 发送批量数据的行为。以上面示例来说:Logstash 会努力攒到 20000 条数据一次性发送出去,但是如果 10 秒钟内也没攒够 20000 条,Logstash 还是会以当前攒到的数据量发一次。
索引名
写入的 ES 索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash 提供了 %{+YYYY.MM.dd} 这种写法。在语法解析的时候,看到以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……
此外,注意索引名中不能有大写字母,否则 ES 在日志中会报 InvalidIndexNameException,但是 Logstash 不会报错,这个错误比较隐晦,也容易掉进这个坑中。
Logstash 1.4.2 在 transport 和 http 协议的情况下是固定连接指定 host 发送数据。从 1.5.0 开始,host 可以设置数组,它会从节点列表中选取不同的节点发送数据,达到 Round-Robin 负载均衡的效果。
Kibana4 强制要求 ES 全集群所有 node 版本在 1.4 以上,Kibana4.2 要求 ES 2.0 以上。所以采用 node 方式发送数据的 logstash-1.4(携带的 Elasticsearch.jar 库是 1.1.1 版本) 会导致 Kibana4 无法运行,采用 Kibana4 的读者务必改用 http 方式。
开发者在 IRC freenode#logstash 频道里表示:"高于 1.0 版本的 Elasticsearch 应该都能跟最新版 logstash 的 node 一起正常工作"。此信息仅供参考,请认真测试后再上线。
经常有同学问,为什么 Logstash 在有多个 conf 文件的情况下,进入 ES 的数据会重复,几个 conf 数据就会重复几次。其实问题原因在之前启动参数章节有提过,output 段顺序执行,没有对日志 type 进行判断的各插件配置都会全部执行一次。在 output 段对 type 进行判断的语法如下所示:
output {
if [type] == "nginxaccess" {
elasticsearch { }
}
}
老版本的性能问题
Logstash 1.4.2 在 http 协议下默认使用作者自己的 ftw 库,随同分发的是 0.0.39 版。该版本有内存泄露问题,长期运行下输出性能越来越差!
解决办法:
对性能要求不高的,可以在启动 logstash 进程时,配置环境变量 ENV["BULK"],强制采用 elasticsearch 官方 Ruby 库。命令如下:
export BULK="esruby"
对性能要求高的,可以尝试采用 logstash-1.5.0RC2 。新版的 outputs/elasticsearch 放弃了 ftw 库,改用了一个 JRuby 平台专有的 Manticore 库。根据测试,性能跟 ftw 比相当接近。
对性能要求极高的,可以手动更新 ftw 库版本,目前最新版是 0.0.42 版,据称内存问题在 0.0.40 版即解决。
模板
Elasticsearch 支持给索引预定义设置和 mapping(前提是你用的 elasticsearch 版本支持这个 API)。Logstash 自带有一个优化好的模板
默认使用./vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/elasticsearch-template.json 这个模板去插入elasticsearch数据,在比较老的版本模板不会自动创建.raw,这会造成elasticsearch在对字符串做聚合查询的时间报Field data loading is forbidden on [xxxx]错误
附,之前的一个匹配:
patterns:
STATUS1 [A-Z]+
TIME \d+\.\d+
DATE \d{4}-\d{1,2}-\d{1,2}\ \d{1,2}:\d{1,2}:\d{1,2}
RESPONSE_AGREEMENT [A-Z]+
METHOD (POST)|(GET)
RESPONSE_METHOD /\S+
DEVICE_ID \d{1,7}
mail_nu \d{1,50}
mobiles \d+
tag1 [a-zA-Z0-9._-]+
MAIN_SC \[%{DATE:logtime}\] device_id: %{DEVICE_ID:device_id}, mail_no: %{mail_nu:mail_no}, mobile: %{mobiles:mobile}, status: %{STATUS1:status}, \[%{tag1:tag}\]
日志:
[2017-05-22 17:06:57] device_id: 1022456, mail_no: 3965330139219, mobile: 13603840208, status: OK, [NEW_API]