问题描述
我有一个logstash输入设置
I have a logstash input setup as
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
}
}
我需要在Elasticsearch中将主题输入两个不同的索引中.谁能帮助我解决应如何设置此任务的输出.目前,我只能设置
I need to feed the topics into two different indexes in elasticsearch. Can anyone help me with how the ouput should be setup for such a task. At this time I am only able to setup
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
}
}
在同一个Elasticsearch实例上我需要两个索引,分别为index1
和index2
,这些索引将由来自topic1
和topic2
I need two indexes on the same elasticsearch instance say index1
and index2
which will be fed by messages coming in on topic1
and topic2
推荐答案
首先,您需要在kafka
输入中添加decorate_events
以便知道消息来自哪个主题
First, you need to add decorate_events
to your kafka
input in order to know from which topic the message is coming
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
decorate_events => true
}
}
然后,您有两个选择,都涉及条件逻辑.第一种是通过引入一个过滤器来根据主题名称添加正确的索引名称.为此,您需要添加
Then, you have two options, both involving conditional logic. The first is by introducing a filter for adding the correct index name depending on the topic name. For this you need to add
filter {
if [kafka][topic] == "topic1" {
mutate {
add_field => {"[@metadata][index]" => "index1"}
}
} else {
mutate {
add_field => {"[@metadata][index]" => "index2"}
}
}
# remove the field containing the decorations, unless you want them to land into ES
mutate {
remove_field => ["kafka"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index]}"
codec => "json"
document_id => "%{id}"
}
}
第二个选择是像这样直接在输出部分执行if/else(但附加的kafka
字段将落入ES中):
Then second option is to do the if/else directly in the output section, like this (but the additional kafka
field will land into ES):
output {
if [@metadata][kafka][topic] == "topic1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
codec => "json"
document_id => "%{id}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "index2"
codec => "json"
document_id => "%{id}"
}
}
}
这篇关于logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!