一、背景

当我们使用logstash从外部读取到数据后,默认情况下读取到的值都是string的类型,假设我们这个时候需要修改字段值的类型,如果从string修改成integer,或者删除字段、修改字段的名字、给字段一个默认值等操作时,这个时候我们就可以借助 mutate filter 来实现。

二、需求

1、从文件中读取数据,文件中的数据符合csv的格式,即默认是以,分隔。

2、对读取到的字段进行 删除字段、修改字段的值、修改字段的类型、给一个默认值、字段合并等操作。

三、实现步骤

1、安装 csv codec 插件

注意⚠️:

默认情况下,csv codec插件并没有安装,需要我们自己手动安装一下,执行如下命令bin/logstash-plugin install logstash-codec-csv

# 进入 logstash 的安装目录
cd /Users/huan/soft/elastic-stack/logstash/logstash
# 监测是否安装了 csv codec 插件
bin/logstash-plugin list --verbose
# 安装 csv 插件
bin/logstash-plugin install logstash-codec-csv

2、准备需要读取的文件数据

张三zhangSan20湖北省;罗田县学历-本科去首尾空格java默认值20210512 08:47:03
李四lisi18湖北省;黄冈学历-专科去首 空格C 20210512 03:12:20

3、编写 pipeline ,读取和输出数据

input {
    file {
        id => "mutate-id"
        path => ["/Users/huan/soft/elastic-stack/logstash/logstash/pipeline.conf/filter-mutate/mutate.csv"]
        start_position => "beginning"
        sincedb_path => "/Users/huan/soft/elastic-stack/logstash/logstash/pipeline.conf/filter-mutate/sincedb.db"
        codec => csv {
            columns => ["user_real_name","user_english_name","age","address","education","strip_blank","language","default_value","create_time"]
            charset => "UTF-8"
            separator => ","
            skip_empty_columns => false
            convert => {
                "age" => "integer"
            }
        }
    }
}

output {
    stdout {
        codec => rubydebug {

        }
    }
}
  • csv codec 插件解释

    • columns :定义一组解析后的csv的列名,也是后期的字段名
    • charset:字符编码
    • separator:定义读取到的一行数据,以什么作为分隔,csv文件一般是以,或tab等进行分隔,默认是逗号
    • skip_empty_columns:如果值为空,是否跳过空列。

      • true:跳过
      • false: 不跳过
    • convert:数据类型转换,默认读取到的值的类型都是string,此处将 age字段的值的数据类型转换成了integer

4、mutate 插件的使用

前置条件:

1、如无特殊说明,测试数据的数据为 实现步骤>准备需要读取的文件数据 中的数据

注意事项:

1、updatereplace都是更新字段的值,但是如果update更新的字段的值不存在,那么没有效果,但是replace会新增加这个字段。

2、copy字段的目标值,如果存在则覆盖值,否则新增加一个字段。

1、coerce 给字段设置默认值

如果某个字段已经存在,并且它的值是null,那么我们可以使用coerce来为它设置默认值

1、配置文件的写法

filter {
    mutate {
        coerce => {
            "default_value" => "该字段没有值,设置一个默认值"
        }
    }
}

2、执行结果

2、rename 给字段重命名

1、配置文件的写法

filter {
    mutate {
        rename => {
            "user_real_name" => "[user][real_name]"
            "user_english_name" => "[user][english_name]"
            "age" => "年龄"
        }
    }
}

2、执行结果

3、update 更新字段的值

1、配置文件的写法

filter {
    mutate {
        # 1、更新字段的值
        update => {
            "user_address" => "用户的地址是: %{address}"
        }
    }
}

2、执行结果

3、解释

update进行更新值,更新的字段必须要存在,否则没有任何效果。

4、replace 更新字段的值

1、配置文件的写法

filter {
    mutate {
        # 1、更新字段的值
        replace => {
            "user_address" => "用户的地址是: %{address}"
        }
    }
}

2、执行结果

5、convert 数据类型转换

1、可以转换的数据类型

integer、integer_eu、float、float_eu、string、boolean

2、配置文件的写法

filter {
    mutate {
        # 1、数据类型转换
        convert => {
            "age" => "string"
        }
    }
}

3、执行结果

6、gsub 对字段内容进行替换

1、配置文件的写法

filter {
    mutate {
        # 1、替换字段的内容, 第二个参数可以写正则表达式 ,替换的字段 只能是 string 类型或者 string类型的数组
        gsub => [
            "address", ";", "--"
        ]
    }
}

2、执行结果

7、uppercase、capitalize、lowercase 大写、首字母大写、小写

1、配置文件的写法

filter {
    mutate {
        # 1.1 大写
        uppercase => ["language"]
        # 2.2 小写
        # lowercase => ["user_english_name"]
        # 3.3 首字母大写
        capitalize => ["user_english_name"]
    }
}

需要注意优先级。

2、执行结果

8、strip 去除首尾空格

1、配置文件的写法

filter {
    mutate {
        # 去除首尾空格
        strip => ["strip_blank"]
    }
}

2、执行结果

9、remove 移除字段

1、配置文件的写法

filter {
    mutate {
        # 移除字段 ,如果 Event 中 username 的值是 zhangsan ,那么会移除字段名是 foo_zhangsan 这个字段。
        remove_field => ["user_real_name","foo_%{username}"]
    }
}

2、执行结果

10、split 切割字段

1、配置文件的写法

filter {
    mutate {
        # 1、切割字段
        split => {
            "address" => ";"
        }
    }
}

2、执行结果

11、 join 连接字段

1、配置文件的写法

filter {
    mutate {
        # 1、切割字段
        split => {
            "address" => ";"
        }

        # 2、连接字段
        join => {
            "address" => "***"
        }
    }
}

先使用 split 切割成数组,然后使用 join 连接

2、执行结果

12、merge 字段合并

1、可以合并的情况

`array` + `string` will work
`string` + `string` will result in an 2 entry array in `dest_field`
`array` and `hash` will not work

2、配置文件的写法

filter {
    mutate {
        # 1、字段合并
        merge => {
            "user_real_name" => "user_english_name"
        }
    }
}

3、执行结果

13、copy 复制字段

1、配置文件的写法

filter {
    mutate {
        # 1、字段复制, 如果 user_name 这个字段已经存在了,那么此字段的值会被覆盖,否则新增一个字段的值
        copy => {
            "user_real_name" => "user_name"
        }
    }
}

2、执行结果

四、mutate的优先级

1、mutate 在配置文件中的执行顺序

coerce、rename、update、replace、convert、gsub、uppercase、capitalize、lowercase、strip、remove、split、join merge、copy

corece最先执行,copy最后执行。

2、多个mutate块的优先级

filter {
    # mutate块1 会比 下方的 mutate2 先执行
    mutate {}
    # mutate块2
    mutate {}
}

注意⚠️:

假设 我们需要字段 age先复制copy一下,然后在转换一下数据类型convert,那么可以使用上方的 多个 mutate 块来执行。

1、配置文件的写法

filter {
      # 测试多 mutate 块的优先级
    mutate {
        copy => {
            "age" => "new_age"
        }
    }
    mutate {
        convert => {
            "age" => "string"
        }
    }
}

将 convert 和 copy 放置在一个 mutate 块中会发现结果不一样。

2、执行结果

五、参考文档

1、https://www.elastic.co/guide/en/logstash/7.12/working-with-plugins.html

2、https://www.elastic.co/guide/en/logstash/current/plugins-codecs-csv.html

3、https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

03-05 22:03